1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{
32 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33 SessionFrom,
34 },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40 handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45 pub sessions: usize,
46 pub messages: usize,
47 pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52 pub sessions: u64,
53 pub messages: u64,
54 pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59 pub rows: LanceArchiveCounts,
60 pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65 pub rows: LanceArchiveCounts,
66 pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71 pub sessions: Vec<IndexIntent>,
72 pub messages: Vec<IndexIntent>,
73 pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78 [
79 (Table::Sessions, &self.sessions),
80 (Table::Messages, &self.messages),
81 (Table::Parts, &self.parts),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91 pub session_id: String,
92 pub id: String,
93 pub search_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100 pub session_id: String,
101 pub id: String,
102 pub vector: Vec<f32>,
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108 pub message_id: String,
109 pub session_id: String,
110 pub role: String,
111 pub project: String,
112 pub source_agent: String,
113 pub timestamp: DateTime<Utc>,
114 pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119 pub session_id: String,
120 pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125 Inserted,
126 Matched,
127}
128
129#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135 pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139 pub fn any_indices_failed(&self) -> bool {
142 self.tables.iter().any(|t| t.indices.is_failed())
143 }
144
145 pub fn into_result(self) -> Result<Self> {
149 for table in &self.tables {
150 if let PhaseOutcome::Failed(error) = &table.indices {
151 anyhow::bail!(
152 "indices phase failed on {}: {error:#}",
153 table.table.as_str()
154 );
155 }
156 if let PhaseOutcome::Failed(error) = &table.compaction {
157 anyhow::bail!(
158 "compaction phase failed on {}: {error:#}",
159 table.table.as_str()
160 );
161 }
162 }
163 Ok(self)
164 }
165}
166
167#[derive(Debug, Clone)]
170pub struct CorpusStats {
171 pub data_url: Url,
172 pub totals: RowTotals,
173 pub adapters: Vec<AdapterStats>,
181 pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189 pub sessions: u64,
190 pub messages: u64,
191 pub parts: u64,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201 pub embedded: usize,
202 pub total: usize,
203 pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208 pub adapter: String,
212 pub sessions: u64,
213 pub messages: u64,
214 pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221 pub project: String,
222 pub sessions: u64,
223 pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228 messages: u64,
229 session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234 pub message: &'a Message,
235 pub parts: &'a [Part],
236 pub search_text: Option<&'a str>,
237}
238
239impl Store {
240 pub async fn open(location: &Url) -> Result<Self> {
246 Ok(Self {
247 handle: Handle::open(location).await?,
248 })
249 }
250
251 pub async fn open_with_options(
257 location: &Url,
258 storage_options: std::collections::HashMap<String, String>,
259 caps: crate::substrate::RuntimeCaps,
260 ) -> Result<Self> {
261 Ok(Self {
262 handle: Handle::open_with_options(location, storage_options, caps).await?,
263 })
264 }
265
266 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271 let url = config::url_for_path(path)?;
272 Self::open_with_options(
273 &url,
274 std::collections::HashMap::new(),
275 crate::substrate::RuntimeCaps::default(),
276 )
277 .await
278 }
279
280 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288 std::fs::create_dir_all(dest)
289 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290 let (sessions, sessions_version) = self
291 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292 .await?;
293 let (messages, messages_version) = self
294 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295 .await?;
296 let (parts, parts_version) = self
297 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298 .await?;
299 Ok(LanceArchiveExport {
300 rows: LanceArchiveCounts {
301 sessions,
302 messages,
303 parts,
304 },
305 source_versions: LanceArchiveVersions {
306 sessions: sessions_version,
307 messages: messages_version,
308 parts: parts_version,
309 },
310 })
311 }
312
313 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314 let sessions_dataset =
315 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316 let messages_dataset =
317 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319 let (sessions, sessions_inserted) = self
320 .import_clean_table(Table::Sessions, sessions_dataset)
321 .await?;
322 let (messages, messages_inserted) = self
323 .import_clean_table(Table::Messages, messages_dataset)
324 .await?;
325 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326 Ok(LanceArchiveImport {
327 rows: LanceArchiveCounts {
328 sessions,
329 messages,
330 parts,
331 },
332 inserted: LanceArchiveCounts {
333 sessions: sessions_inserted,
334 messages: messages_inserted,
335 parts: parts_inserted,
336 },
337 })
338 }
339
340 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341 let dataset = self.handle.dataset(table).await?;
342 let source_version = dataset.version_id();
343 let schema = export_schema(table);
344 let mut stream = dataset
345 .scan()
346 .try_into_stream()
347 .await
348 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349 let dest_uri = dest
350 .to_str()
351 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353 let mut rows = 0usize;
354 let mut wrote = false;
355 while let Some(batch) = stream.next().await {
356 let batch = batch
357 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358 rows += batch.num_rows();
359 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360 let mut params = write_params_for_create();
361 if wrote {
362 params.mode = WriteMode::Append;
363 }
364 Dataset::write(reader, dest_uri, Some(params))
365 .await
366 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367 wrote = true;
368 }
369
370 if !wrote {
371 let batch = RecordBatch::new_empty(schema.clone());
372 let reader = RecordBatchIterator::new([Ok(batch)], schema);
373 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374 .await
375 .with_context(|| {
376 format!("failed to write empty {} archive table", table.as_str())
377 })?;
378 }
379 Ok((rows, source_version))
380 }
381
382 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383 let mut stream = dataset
384 .scan()
385 .try_into_stream()
386 .await
387 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388 let mut rows = 0usize;
389 let mut inserted = 0usize;
390 while let Some(batch) = stream.next().await {
391 let batch = batch
392 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393 let row_count = batch.num_rows();
394 rows += row_count;
395 inserted += self
396 .handle
397 .merge_insert(table, batch, row_count)
398 .await
399 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400 as usize;
401 }
402 Ok((rows, inserted))
403 }
404
405 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406 if sessions.is_empty() {
407 return Ok(Vec::new());
408 }
409 let batches = sessions_batches(sessions)?;
410 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411 Ok(statuses_from_inserted(sessions.len(), inserted))
413 }
414
415 async fn upsert_session_batch(
439 &self,
440 substreams: Vec<CompletedSubstream>,
441 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442 if substreams.is_empty() {
443 return Ok((Vec::new(), BatchCounts::default()));
444 }
445
446 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447 let mut counts = BatchCounts::default();
448
449 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453 let mut by_session_id: std::collections::HashMap<String, usize> =
454 std::collections::HashMap::with_capacity(substreams.len());
455 for substream in substreams {
456 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457 let existing = &merged[existing_idx];
458 if existing.session.source_agent != substream.session.source_agent
459 || existing.session.project != substream.session.project
460 {
461 let reason = if existing.session.source_agent != substream.session.source_agent
466 {
467 IngestError::ImmutableField {
468 field: "source_agent",
469 session_id: substream.session.id.clone(),
470 stored: existing.session.source_agent.clone(),
471 attempted: substream.session.source_agent.clone(),
472 }
473 } else {
474 IngestError::ImmutableField {
475 field: "project",
476 session_id: substream.session.id.clone(),
477 stored: (*existing.session.project).clone(),
478 attempted: (*substream.session.project).clone(),
479 }
480 };
481 let field = match &reason {
482 IngestError::ImmutableField { field, .. } => Some(*field),
483 };
484 let reason_key = match field {
485 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487 _ => DROP_REASON_UNCATEGORIZED,
488 };
489 outcomes.extend(error_outcomes_for_substream(
490 substream.session_index,
491 &substream.session,
492 &substream.messages,
493 reason.to_string(),
494 field,
495 reason_key,
496 ));
497 continue;
498 }
499 let existing = &mut merged[existing_idx];
504 let mut seen: std::collections::HashSet<String> = existing
505 .messages
506 .iter()
507 .map(|m| m.message.id().to_owned())
508 .collect();
509 for msg in substream.messages {
510 if seen.insert(msg.message.id().to_owned()) {
511 existing.messages.push(msg);
512 }
513 }
514 continue;
515 }
516 by_session_id.insert(substream.session.id.clone(), merged.len());
517 merged.push(substream);
518 }
519
520 let session_id_values: Vec<ScalarValue> = merged
525 .iter()
526 .map(|substream| ScalarValue::String(substream.session.id.clone()))
527 .collect();
528 let existing_sessions: std::collections::HashMap<String, Session> =
529 if session_id_values.is_empty() {
530 std::collections::HashMap::new()
531 } else {
532 let batch = self
533 .handle
534 .scan_batch(
535 Table::Sessions,
536 Some(&Predicate::In("id", session_id_values.clone())),
537 &[],
538 )
539 .await?;
540 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541 for row in 0..batch.num_rows() {
542 let session = session_from_batch(&batch, row)?;
543 map.insert(session.id.clone(), session);
544 }
545 map
546 };
547 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548 HashSet::new()
549 } else {
550 let batch = self
551 .handle
552 .scan_batch(
553 Table::Messages,
554 Some(&Predicate::In("session_id", session_id_values.clone())),
555 &["session_id", "id"],
556 )
557 .await?;
558 let mut set = HashSet::with_capacity(batch.num_rows());
559 for row in 0..batch.num_rows() {
560 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561 let mid = string(&batch, "id", row)?.context("message id is null")?;
562 set.insert((sid, mid));
563 }
564 set
565 };
566 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567 HashSet::new()
568 } else {
569 let batch = self
570 .handle
571 .scan_batch(
572 Table::Parts,
573 Some(&Predicate::In("session_id", session_id_values)),
574 &["session_id", "message_id", "id"],
575 )
576 .await?;
577 let mut set = HashSet::with_capacity(batch.num_rows());
578 for row in 0..batch.num_rows() {
579 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581 let pid = string(&batch, "id", row)?.context("part id is null")?;
582 set.insert((sid, mid, pid));
583 }
584 set
585 };
586
587 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588 for substream in merged {
589 if let Some(existing) = existing_sessions.get(&substream.session.id)
590 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591 {
592 let field = match &failure {
593 IngestError::ImmutableField { field, .. } => Some(*field),
594 };
595 let reason_key = match field {
596 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598 _ => DROP_REASON_UNCATEGORIZED,
599 };
600 outcomes.extend(error_outcomes_for_substream(
601 substream.session_index,
602 &substream.session,
603 &substream.messages,
604 failure.to_string(),
605 field,
606 reason_key,
607 ));
608 continue;
609 }
610 writeable.push(substream);
611 }
612
613 if writeable.is_empty() {
614 outcomes.sort_by_key(|outcome| outcome.index);
615 return Ok((outcomes, counts));
616 }
617
618 let sessions_owned: Vec<Session> = writeable
619 .iter()
620 .map(|substream| substream.session.clone())
621 .collect();
622 let message_rows: Vec<MessageBatchRow<'_>> = writeable
623 .iter()
624 .flat_map(|substream| {
625 substream.messages.iter().map(|buffered| MessageBatchRow {
626 message: &buffered.message,
627 source_agent: &substream.session.source_agent,
628 project: &substream.session.project,
629 search_text: buffered.search_text.as_deref(),
630 })
631 })
632 .collect();
633 let part_rows: Vec<Part> = writeable
634 .iter()
635 .flat_map(|substream| {
636 substream.messages.iter().flat_map(|buffered| {
637 buffered
638 .parts
639 .iter()
640 .map(|buffered_part| buffered_part.part.clone())
641 })
642 })
643 .collect();
644
645 let session_batches = sessions_batches(&sessions_owned)?;
646 let message_batches = messages_batches(&message_rows)?;
647 let part_batches = parts_batches(&part_rows)?;
648
649 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660 )?;
661
662 for substream in &writeable {
663 outcomes.extend(success_outcomes_for_substream(
664 substream.session_index,
665 &substream.session,
666 &substream.messages,
667 &existing_sessions,
668 &existing_message_pks,
669 &existing_part_pks,
670 &mut counts,
671 ));
672 }
673
674 outcomes.sort_by_key(|outcome| outcome.index);
675 Ok((outcomes, counts))
676 }
677
678 pub async fn upsert_messages(
679 &self,
680 session: &Session,
681 messages: &[MessageWrite<'_>],
682 ) -> Result<Vec<UpsertStatus>> {
683 if messages.is_empty() {
684 return Ok(Vec::new());
685 }
686
687 let rows = messages
688 .iter()
689 .map(|write| MessageBatchRow {
690 message: write.message,
691 source_agent: &session.source_agent,
692 project: &session.project,
693 search_text: write.search_text,
694 })
695 .collect::<Vec<_>>();
696 let batches = messages_batches(&rows)?;
697 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698 Ok(statuses_from_inserted(messages.len(), inserted))
699 }
700
701 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702 if parts.is_empty() {
703 return Ok(Vec::new());
704 }
705 let batches = parts_batches(parts)?;
706 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707 Ok(statuses_from_inserted(parts.len(), inserted))
708 }
709
710 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711 let Some(session) = self.find_session(session_id).await? else {
712 return Ok(None);
713 };
714 let messages = self.messages_for_session(session_id).await?;
715 Ok(Some(SessionWithMessages { session, messages }))
716 }
717
718 pub async fn session_ids(&self) -> Result<Vec<String>> {
720 let batch = self
721 .handle
722 .scan_batch(Table::Sessions, None, &["id"])
723 .await?;
724 let mut ids = Vec::with_capacity(batch.num_rows());
725 for row in 0..batch.num_rows() {
726 if let Some(id) = string(&batch, "id", row)? {
727 ids.push(id);
728 }
729 }
730 Ok(ids)
731 }
732
733 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734 let batch = self
735 .handle
736 .scan_batch(
737 Table::Sessions,
738 Some(&Predicate::Eq(
739 "parent_session_id",
740 parent_session_id.into(),
741 )),
742 &[
743 "id",
744 "parent_session_id",
745 "parent_message_id",
746 "source_agent",
747 "created_at",
748 "project",
749 "options",
750 ],
751 )
752 .await?;
753 let mut sessions = Vec::with_capacity(batch.num_rows());
754 for row in 0..batch.num_rows() {
755 sessions.push(session_from_batch(&batch, row)?);
756 }
757 sessions.sort_by(|left, right| left.id.cmp(&right.id));
758 Ok(sessions)
759 }
760
761 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767 use lance::deps::arrow_array::UInt64Array;
768
769 let dataset = self.handle.dataset(Table::Sessions).await?;
770 let version_list = dataset.versions().await?;
771 let versions: HashMap<u64, DateTime<Utc>> = version_list
772 .iter()
773 .map(|v| (v.version, v.timestamp))
774 .collect();
775 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784 let scanner = self
785 .handle
786 .scan(
787 Table::Sessions,
788 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789 )
790 .await?;
791 let mut stream = scanner.try_into_stream().await?;
792 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793 while let Some(batch) = stream.next().await {
794 let batch = batch?;
795 let version_array = batch
796 .column_by_name("_row_last_updated_at_version")
797 .context("missing _row_last_updated_at_version column")?
798 .as_any()
799 .downcast_ref::<UInt64Array>()
800 .context("_row_last_updated_at_version is not UInt64")?;
801 for row in 0..batch.num_rows() {
802 let Some(id) = string(&batch, "id", row)? else {
803 continue;
804 };
805 if version_array.is_null(row) {
806 continue;
807 }
808 let version = version_array.value(row);
809 let ts = versions.get(&version).copied().or(oldest_visible_ts);
810 if let Some(ts) = ts {
811 out.insert(id, ts);
812 }
813 }
814 }
815 Ok(out)
816 }
817
818 pub async fn session_view(
825 &self,
826 session_id: &str,
827 params: SessionViewParams<'_>,
828 ) -> Result<GetLookup<SessionPage>> {
829 let Some(session) = self.find_session(session_id).await? else {
830 return Ok(GetLookup::NotFound);
831 };
832
833 let mut rows = match params.mode {
834 ResponseMode::Conversational => self
835 .scan_conversational_messages(session_id)
836 .await?
837 .into_iter()
838 .map(|row| ScanRow {
839 id: row.message_id,
840 role: row.role,
841 timestamp: row.timestamp,
842 text: Some(row.text.into_inner()),
843 content: None,
844 })
845 .collect(),
846 ResponseMode::Complete | ResponseMode::Verbatim => {
847 self.scan_all_messages(session_id).await?
848 }
849 };
850 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852 let start_at = match params.after_id {
853 Some(after) => match rows.iter().position(|row| row.id == after) {
856 Some(idx) => idx + 1,
857 None => return Ok(GetLookup::UnknownAfterId),
858 },
859 None => 0,
860 };
861 let remaining = rows.get(start_at..).unwrap_or(&[]);
862 let (emitted, messages_remaining) = match params.session_from {
863 SessionFrom::Start => {
864 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865 row.text.as_deref().map_or(0, str::len)
866 });
867 (&remaining[..n], remaining.len() - n)
868 }
869 SessionFrom::End => {
873 let mut bytes = 0usize;
874 let mut start = remaining.len();
875 for row in remaining.iter().rev() {
876 if remaining.len() - start >= params.limit {
877 break;
878 }
879 let size = row.text.as_deref().map_or(0, str::len);
880 if start < remaining.len() && bytes + size > params.budget_bytes {
881 break;
882 }
883 bytes += size;
884 start -= 1;
885 }
886 (&remaining[start..], start)
887 }
888 };
889 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891 let mut parts_by_message = match params.mode {
894 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895 ResponseMode::Conversational | ResponseMode::Complete => {
896 self.summary_parts_for_messages(session_id, &ids).await?
897 }
898 };
899 let messages = emitted
900 .iter()
901 .map(|row| RetrievedMessage {
902 id: row.id.clone(),
903 role: row.role,
904 timestamp: row.timestamp,
905 text: row.text.clone(),
906 content: row.content.clone(),
907 parts: parts_by_message
908 .remove(&(session_id.to_owned(), row.id.clone()))
909 .unwrap_or_default(),
910 })
911 .collect();
912
913 Ok(GetLookup::Found(SessionPage {
914 session,
915 messages,
916 messages_remaining,
917 }))
918 }
919
920 pub async fn message_view(
926 &self,
927 message_id: &str,
928 params: MessageViewParams<'_>,
929 ) -> Result<GetLookup<MessagePage>> {
930 let Some(session_id) = self.session_id_for_message(message_id).await? else {
931 return Ok(GetLookup::NotFound);
932 };
933 let Some(session) = self.find_session(&session_id).await? else {
934 return Ok(GetLookup::NotFound);
935 };
936 let mut rows = self.scan_all_messages(&session_id).await?;
937 if matches!(params.mode, ResponseMode::Conversational) {
943 rows.retain(|row| row.text.is_some() || row.id == message_id);
944 }
945 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
946 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
947 return Ok(GetLookup::NotFound);
948 };
949
950 let start = target_pos.saturating_sub(params.context_depth);
951 let end = (target_pos + params.context_depth + 1).min(rows.len());
952 let window = &rows[start..end];
953 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
954 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
957
958 let all_parts = parts_by_message
959 .remove(&(session_id.clone(), message_id.to_owned()))
960 .unwrap_or_default();
961 let start_part = match params.after_id {
962 Some(after) => match all_parts.iter().find(|part| part.id == after) {
966 Some(anchor) => all_parts
967 .iter()
968 .position(|part| part.ordinal > anchor.ordinal)
969 .unwrap_or(all_parts.len()),
970 None => return Ok(GetLookup::UnknownAfterId),
971 },
972 None => 0,
973 };
974 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
975 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
976 serde_json::to_string(part).map_or(0, |json| json.len())
977 });
978 let target_parts = remaining_parts[..part_count].to_vec();
979 let target_parts_remaining = remaining_parts.len() - part_count;
980
981 let target_row = &rows[target_pos];
982 let target = RetrievedMessage {
983 id: target_row.id.clone(),
984 role: target_row.role,
985 timestamp: target_row.timestamp,
986 text: target_row.text.clone(),
987 content: target_row.content.clone(),
988 parts: Vec::new(),
990 };
991 let siblings = window
992 .iter()
993 .enumerate()
994 .filter(|(idx, _)| start + idx != target_pos)
995 .map(|(_, row)| RetrievedMessage {
996 id: row.id.clone(),
997 role: row.role,
998 timestamp: row.timestamp,
999 text: row.text.clone(),
1000 content: row.content.clone(),
1001 parts: parts_by_message
1002 .get(&(session_id.clone(), row.id.clone()))
1003 .cloned()
1004 .unwrap_or_default(),
1005 })
1006 .collect();
1007
1008 Ok(GetLookup::Found(MessagePage {
1009 session,
1010 target,
1011 target_parts,
1012 target_parts_remaining,
1013 siblings,
1014 }))
1015 }
1016
1017 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1018 let batch = self
1019 .handle
1020 .scan_batch(
1021 Table::Messages,
1022 Some(&Predicate::Eq("session_id", session_id.into())),
1023 &["id", "timestamp", "role", "search_text", "content"],
1024 )
1025 .await?;
1026 let mut rows = Vec::with_capacity(batch.num_rows());
1027 for row in 0..batch.num_rows() {
1028 let id = string(&batch, "id", row)?.context("message id is null")?;
1029 let role =
1030 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1031 let timestamp = datetime(&batch, "timestamp", row)?;
1032 rows.push(ScanRow {
1033 id,
1034 role,
1035 timestamp,
1036 text: string(&batch, "search_text", row)?,
1037 content: string(&batch, "content", row)?,
1038 });
1039 }
1040 Ok(rows)
1041 }
1042
1043 pub async fn scan_conversational_messages(
1047 &self,
1048 session_id: &str,
1049 ) -> Result<Vec<ConversationalRow>> {
1050 let filter = Predicate::And(vec![
1051 Predicate::Eq("session_id", session_id.into()),
1052 Predicate::IsNotNull("search_text"),
1053 ]);
1054 let batch = self
1055 .handle
1056 .scan_batch(
1057 Table::Messages,
1058 Some(&filter),
1059 &["id", "timestamp", "role", "search_text"],
1060 )
1061 .await?;
1062
1063 let mut rows = Vec::with_capacity(batch.num_rows());
1064 for row in 0..batch.num_rows() {
1065 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1066 let role =
1067 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1068 let timestamp = datetime(&batch, "timestamp", row)?;
1069 let text_str = string(&batch, "search_text", row)?.context(
1070 "search_text null after IsNotNull pushdown - storage invariant violated",
1071 )?;
1072 rows.push(ConversationalRow {
1073 session_id: session_id.to_owned(),
1074 message_id,
1075 role,
1076 timestamp,
1077 text: SearchText(text_str),
1078 });
1079 }
1080 rows.sort_by(|a, b| {
1081 a.timestamp
1082 .cmp(&b.timestamp)
1083 .then_with(|| a.message_id.cmp(&b.message_id))
1084 });
1085 Ok(rows)
1086 }
1087
1088 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1091 let batch = self
1092 .handle
1093 .scan_batch(
1094 Table::Messages,
1095 Some(&Predicate::Eq("id", message_id.into())),
1096 &["session_id"],
1097 )
1098 .await?;
1099 if batch.num_rows() == 0 {
1100 return Ok(None);
1101 }
1102 string(&batch, "session_id", 0)
1103 }
1104
1105 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1106 self.handle.row_counts().await
1107 }
1108
1109 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1113 Ok(Arc::new(self.handle.dataset(table).await?))
1114 }
1115
1116 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1118 self.handle.export_write(name, bytes).await
1119 }
1120
1121 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1123 self.handle.export_read(name).await
1124 }
1125
1126 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1128 self.handle.export_local_path(name)
1129 }
1130
1131 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1137 let scanner = self
1138 .handle
1139 .scan(
1140 Table::Messages,
1141 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1142 )
1143 .await?;
1144 let mut stream = scanner.try_into_stream().await?;
1145 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1146 while let Some(batch) = stream.next().await {
1147 let batch = batch?;
1148 for row in 0..batch.num_rows() {
1149 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1150 let project = string(&batch, "project", row)?.unwrap_or_default();
1151 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1152 let is_subagent = source_agent.contains('/');
1153 if is_subagent && !include_subagents {
1154 continue;
1155 }
1156 let entry = groups.entry((source_agent, project)).or_default();
1157 entry.messages += 1;
1158 entry.session_ids.insert(session_id);
1159 }
1160 }
1161
1162 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1163 let totals = RowTotals {
1164 sessions: totals_sessions as u64,
1165 messages: totals_messages as u64,
1166 parts: totals_parts as u64,
1167 };
1168
1169 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1170 for ((adapter, project), acc) in groups {
1171 by_adapter.entry(adapter).or_default().push(ProjectStats {
1172 project,
1173 sessions: acc.session_ids.len() as u64,
1174 messages: acc.messages,
1175 });
1176 }
1177
1178 let mut adapters = Vec::with_capacity(by_adapter.len());
1179 for (adapter, mut projects) in by_adapter {
1180 projects.sort_by(|a, b| {
1181 b.messages
1182 .cmp(&a.messages)
1183 .then_with(|| a.project.cmp(&b.project))
1184 });
1185 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1186 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1187 adapters.push(AdapterStats {
1188 adapter,
1189 sessions,
1190 messages,
1191 projects,
1192 });
1193 }
1194
1195 Ok(CorpusStats {
1196 data_url: self.handle.location().clone(),
1197 totals,
1198 adapters,
1199 include_subagents,
1200 })
1201 }
1202
1203 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1208 if rows.is_empty() {
1209 return Ok(());
1210 }
1211 let batch = embedding_update_batch(rows)?;
1212 self.handle
1213 .merge_update(Table::Messages, batch, rows.len())
1214 .await?;
1215 Ok(())
1216 }
1217
1218 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1221 try_stream! {
1222 let filter = Predicate::And(vec![
1223 Predicate::IsNull("vector"),
1224 Predicate::IsNotNull("search_text"),
1225 ]);
1226 let projection: &[&str] = &["session_id", "id", "search_text"];
1227 let scanner = self
1228 .handle
1229 .scan(
1230 Table::Messages,
1231 ScanOpts::with_predicate_and_projection(&filter, projection),
1232 )
1233 .await?;
1234 let mut batches = scanner
1235 .try_into_stream()
1236 .await
1237 .context("failed to open messages stream")?;
1238 while let Some(batch) = batches.next().await {
1239 let batch = batch?;
1240 for row in 0..batch.num_rows() {
1241 yield PendingMessage {
1242 session_id: string(&batch, "session_id", row)?
1243 .context("session_id is null")?,
1244 id: string(&batch, "id", row)?.context("message id is null")?,
1245 search_text: string(&batch, "search_text", row)?
1246 .context("search_text is null")?,
1247 };
1248 }
1249 }
1250 }
1251 }
1252
1253 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1258 try_stream! {
1259 let filter = Predicate::And(vec![
1260 Predicate::IsNotNull("search_text"),
1261 Predicate::Or(vec![
1262 Predicate::IsNull("vector"),
1263 Predicate::Ne("embedding_model", embed::model_id().into()),
1264 ]),
1265 ]);
1266 let projection: &[&str] = &["session_id", "id", "search_text"];
1267 let scanner = self
1268 .handle
1269 .scan(
1270 Table::Messages,
1271 ScanOpts::with_predicate_and_projection(&filter, projection),
1272 )
1273 .await?;
1274 let mut batches = scanner
1275 .try_into_stream()
1276 .await
1277 .context("failed to open pending-or-stale messages stream")?;
1278 while let Some(batch) = batches.next().await {
1279 let batch = batch?;
1280 for row in 0..batch.num_rows() {
1281 yield PendingMessage {
1282 session_id: string(&batch, "session_id", row)?
1283 .context("session_id is null")?,
1284 id: string(&batch, "id", row)?.context("message id is null")?,
1285 search_text: string(&batch, "search_text", row)?
1286 .context("search_text is null")?,
1287 };
1288 }
1289 }
1290 }
1291 }
1292
1293 pub async fn fts_search(
1295 &self,
1296 query: &str,
1297 limit: usize,
1298 filter: &Predicate,
1299 ) -> Result<Vec<(MessageKey, f32)>> {
1300 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1301 scanner.full_text_search(
1302 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1303 )?;
1304 scanner.disable_scoring_autoprojection();
1310 scanner.project(&["session_id", "id", "_score"])?;
1311 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1312 let batch = scanner.try_into_batch().await?;
1313 let mut hits = Vec::with_capacity(batch.num_rows());
1314 for row in 0..batch.num_rows() {
1315 let key = MessageKey {
1316 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1317 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1318 };
1319 hits.push((key, float32(&batch, "_score", row)?));
1320 }
1321 hits.sort_by(|left, right| {
1329 right
1330 .1
1331 .partial_cmp(&left.1)
1332 .unwrap_or(std::cmp::Ordering::Equal)
1333 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1334 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1335 });
1336 Ok(hits)
1337 }
1338
1339 pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1346 let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1347 let dataset = self.handle.dataset(Table::Messages).await?;
1348 dataset
1349 .count_rows(Some(scope.to_lance()))
1350 .await
1351 .map_err(Into::into)
1352 }
1353
1354 pub async fn has_embeddings(&self) -> Result<bool> {
1359 let scope = Predicate::IsNotNull("vector");
1360 let mut scanner = self
1361 .handle
1362 .scan(
1363 Table::Messages,
1364 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1365 )
1366 .await?;
1367 scanner.limit(Some(1), None)?;
1368 let batch = scanner.try_into_batch().await?;
1369 Ok(batch.num_rows() > 0)
1370 }
1371
1372 pub async fn vector_search(
1380 &self,
1381 query: &[f32],
1382 limit: usize,
1383 filter: &Predicate,
1384 search: Option<&config::SearchConfig>,
1385 ) -> Result<Vec<(MessageKey, f32)>> {
1386 let scope = embedded_scope(filter);
1387 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1388 let key = Float32Array::from(query.to_vec());
1389 scanner.nearest("vector", &key, limit)?;
1390 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1391 scanner.nprobes(nprobes);
1392 }
1393 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1394 scanner.refine(refine_factor);
1395 }
1396 scanner.disable_scoring_autoprojection();
1400 scanner.project(&["session_id", "id", "_distance"])?;
1401 let batch = scanner.try_into_batch().await?;
1402 let mut hits = Vec::with_capacity(batch.num_rows());
1403 for row in 0..batch.num_rows() {
1404 let key = MessageKey {
1405 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1406 message_id: string(&batch, "id", row)?.context("message id is null")?,
1407 };
1408 hits.push((key, float32(&batch, "_distance", row)?));
1409 }
1410 hits.sort_by(|left, right| {
1416 left.1
1417 .partial_cmp(&right.1)
1418 .unwrap_or(std::cmp::Ordering::Equal)
1419 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1420 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1421 });
1422 Ok(hits)
1423 }
1424
1425 pub async fn explain_vector_plan(
1428 &self,
1429 query: &[f32],
1430 limit: usize,
1431 filter: &Predicate,
1432 search: Option<&config::SearchConfig>,
1433 ) -> Result<String> {
1434 let scope = embedded_scope(filter);
1435 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1436 let key = Float32Array::from(query.to_vec());
1437 scanner.nearest("vector", &key, limit)?;
1438 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1439 scanner.nprobes(nprobes);
1440 }
1441 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1442 scanner.refine(refine_factor);
1443 }
1444 scanner
1445 .explain_plan(true)
1446 .await
1447 .context("explain_plan failed")
1448 }
1449
1450 pub async fn explain_fts_plan(
1451 &self,
1452 query: &str,
1453 limit: usize,
1454 filter: &Predicate,
1455 ) -> Result<String> {
1456 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1457 scanner.full_text_search(
1458 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1459 )?;
1460 scanner.project(&["session_id", "id"])?;
1461 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1462 scanner
1463 .explain_plan(true)
1464 .await
1465 .context("explain_plan failed")
1466 }
1467
1468 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1470 if keys.is_empty() {
1471 return Ok(Vec::new());
1472 }
1473 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1474 let session_ids = keys
1475 .iter()
1476 .map(|key| key.session_id.clone())
1477 .collect::<Vec<_>>();
1478 let message_ids = keys
1479 .iter()
1480 .map(|key| key.message_id.clone())
1481 .collect::<Vec<_>>();
1482 let predicate = Predicate::And(vec![
1483 in_predicate("session_id", &session_ids),
1484 in_predicate("id", &message_ids),
1485 ]);
1486 let batch = self
1487 .handle
1488 .scan_batch(
1489 Table::Messages,
1490 Some(&predicate),
1491 &[
1492 "id",
1493 "session_id",
1494 "role",
1495 "project",
1496 "source_agent",
1497 "timestamp",
1498 "search_text",
1499 ],
1500 )
1501 .await?;
1502 let mut metas = Vec::with_capacity(batch.num_rows());
1503 for row in 0..batch.num_rows() {
1504 let message_id = string(&batch, "id", row)?.context("id is null")?;
1505 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1506 if !wanted.contains(&MessageKey {
1507 session_id: session_id.clone(),
1508 message_id: message_id.clone(),
1509 }) {
1510 continue;
1511 }
1512 metas.push(MessageMeta {
1513 message_id,
1514 session_id,
1515 role: string(&batch, "role", row)?.context("role is null")?,
1516 project: string(&batch, "project", row)?.context("project is null")?,
1517 source_agent: string(&batch, "source_agent", row)?
1518 .context("source_agent is null")?,
1519 timestamp: datetime(&batch, "timestamp", row)?,
1520 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1521 });
1522 }
1523 Ok(metas)
1524 }
1525
1526 pub async fn session_message_counts(
1528 &self,
1529 session_ids: &[String],
1530 ) -> Result<BTreeMap<String, usize>> {
1531 if session_ids.is_empty() {
1532 return Ok(BTreeMap::new());
1533 }
1534 let dataset = self.handle.dataset(Table::Messages).await?;
1535 let mut tasks = tokio::task::JoinSet::new();
1536 for session_id in session_ids {
1537 let dataset = dataset.clone();
1538 let session_id = session_id.clone();
1539 tasks.spawn(async move {
1540 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1541 let count = dataset.count_rows(Some(filter)).await?;
1542 anyhow::Ok((session_id, count))
1543 });
1544 }
1545 let mut counts = BTreeMap::new();
1546 while let Some(joined) = tasks.join_next().await {
1547 let (session_id, count) = joined.context("session count task panicked")??;
1548 counts.insert(session_id, count);
1549 }
1550 Ok(counts)
1551 }
1552
1553 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1556 self.handle
1557 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1558 .await
1559 }
1560
1561 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1567 self.handle
1568 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1569 .await
1570 }
1571
1572 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1579 let dataset = self.handle.dataset(Table::Messages).await?;
1580 let embedded = dataset
1581 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1582 .await?;
1583 let total = dataset
1584 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1585 .await?;
1586 Ok(EmbeddingProgress {
1587 embedded,
1588 total,
1589 model: embed::model_id(),
1590 })
1591 }
1592
1593 pub async fn stale_embedding_count(&self) -> Result<usize> {
1597 let dataset = self.handle.dataset(Table::Messages).await?;
1598 dataset
1599 .count_rows(Some(
1600 Predicate::And(vec![
1601 Predicate::IsNotNull("vector"),
1602 Predicate::Ne("embedding_model", embed::model_id().into()),
1603 ])
1604 .to_lance(),
1605 ))
1606 .await
1607 .map_err(Into::into)
1608 }
1609
1610 pub async fn optimize_indices(
1616 &self,
1617 progress: Option<OptimizeProgressFn>,
1618 maintenance: &MaintenancePolicy,
1619 ) -> Result<OptimizeOutcome> {
1620 let intents = pond_index_intents();
1621 let mut tables = Vec::with_capacity(3);
1622 for (table, intents) in intents.all() {
1623 let outcome = self
1624 .handle
1625 .optimize_table(table, intents, progress.as_ref(), maintenance)
1626 .await;
1627 tables.push(outcome);
1628 }
1629 Ok(OptimizeOutcome { tables })
1630 }
1631
1632 pub async fn build_indices_only(
1638 &self,
1639 progress: Option<OptimizeProgressFn>,
1640 ) -> Result<OptimizeOutcome> {
1641 let policy = pond_index_intents();
1642 let mut tables = Vec::with_capacity(3);
1643 for (table, intents) in policy.all() {
1644 let indices = self
1645 .handle
1646 .optimize_table_indices_only(table, intents, progress.as_ref())
1647 .await;
1648 tables.push(TableOptimizeOutcome {
1649 table,
1650 indices,
1651 compaction: PhaseOutcome::NotAttempted,
1652 });
1653 }
1654 Ok(OptimizeOutcome { tables })
1655 }
1656
1657 #[cfg(test)]
1658 async fn optimize_indices_with_vector_threshold(
1659 &self,
1660 vector_threshold: usize,
1661 ) -> Result<OptimizeOutcome> {
1662 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1663 let policy = MaintenancePolicy::always_compact();
1664 let mut tables = Vec::with_capacity(3);
1665 for (table, intents) in intents.all() {
1666 let outcome = self
1667 .handle
1668 .optimize_table(table, intents, None, &policy)
1669 .await;
1670 tables.push(outcome);
1671 }
1672 Ok(OptimizeOutcome { tables })
1673 }
1674
1675 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1676 let policy = pond_index_intents();
1677 let mut matched = false;
1678 for (table, intents) in policy.all() {
1679 for intent in intents {
1680 if intent_name.is_none_or(|name| name == intent.name) {
1681 matched = true;
1682 self.handle.rebuild_index(table, intent).await?;
1683 }
1684 }
1685 }
1686 if let Some(name) = intent_name
1687 && !matched
1688 {
1689 anyhow::bail!("unknown index intent {name:?}");
1690 }
1691 Ok(())
1692 }
1693
1694 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1695 let policy = pond_index_intents();
1696 let mut statuses = Vec::new();
1697 for (table, intents) in policy.all() {
1698 statuses.extend(self.handle.index_status(table, intents).await?);
1699 }
1700 Ok(statuses)
1701 }
1702
1703 pub async fn drop_vector_index(&self) -> Result<()> {
1707 match self
1708 .handle
1709 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1710 .await
1711 {
1712 Ok(()) => Ok(()),
1713 Err(error) => {
1714 let msg = error.to_string();
1715 if msg.contains("not found") || msg.contains("does not exist") {
1716 Ok(())
1717 } else {
1718 Err(error)
1719 }
1720 }
1721 }
1722 }
1723
1724 pub async fn table_sizes(&self) -> Result<TableSizes> {
1727 self.handle.table_sizes().await
1728 }
1729
1730 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1731 let batch = self
1732 .handle
1733 .scan_batch(
1734 Table::Sessions,
1735 Some(&Predicate::Eq("id", session_id.into())),
1736 &[],
1737 )
1738 .await?;
1739 if batch.num_rows() == 0 {
1740 Ok(None)
1741 } else {
1742 Ok(Some(session_from_batch(&batch, 0)?))
1743 }
1744 }
1745
1746 pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1752 let batch = self
1753 .handle
1754 .scan_batch(
1755 Table::Messages,
1756 Some(&Predicate::Eq("id", message_id.into())),
1757 &["vector"],
1758 )
1759 .await?;
1760 if batch.num_rows() == 0 {
1761 return Ok(None);
1762 }
1763 let column = batch
1764 .column(0)
1765 .as_any()
1766 .downcast_ref::<FixedSizeListArray>();
1767 let Some(list) = column else {
1768 return Ok(None);
1769 };
1770 if list.is_null(0) {
1771 return Ok(None);
1772 }
1773 let values = list.value(0);
1774 let halves = values
1775 .as_any()
1776 .downcast_ref::<Float16Array>()
1777 .context("messages.vector inner array is not Float16")?;
1778 let widened = (0..halves.len())
1779 .map(|i| halves.value(i).to_f32())
1780 .collect();
1781 Ok(Some(widened))
1782 }
1783
1784 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1785 let batch = self
1786 .handle
1787 .scan_batch(
1788 Table::Messages,
1789 Some(&Predicate::Eq("session_id", session_id.into())),
1790 &[
1791 "session_id",
1792 "id",
1793 "timestamp",
1794 "role",
1795 "content",
1796 "options",
1797 ],
1798 )
1799 .await?;
1800 let mut messages = Vec::with_capacity(batch.num_rows());
1801 for row in 0..batch.num_rows() {
1802 messages.push(message_from_batch(&batch, row)?);
1803 }
1804 messages.sort_by(|left, right| {
1805 left.timestamp()
1806 .cmp(&right.timestamp())
1807 .then_with(|| left.id().cmp(right.id()))
1808 });
1809
1810 let message_ids = messages
1811 .iter()
1812 .map(|message| message.id().to_owned())
1813 .collect::<Vec<_>>();
1814 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1815
1816 Ok(messages
1817 .into_iter()
1818 .map(|message| {
1819 let key = (message.session_id().to_owned(), message.id().to_owned());
1820 let parts = parts_by_message.remove(&key).unwrap_or_default();
1821 MessageWithParts { message, parts }
1822 })
1823 .collect())
1824 }
1825
1826 pub async fn parts_for_messages(
1830 &self,
1831 session_id: &str,
1832 message_ids: &[String],
1833 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1834 self.scan_parts(session_id, message_ids, None).await
1835 }
1836
1837 pub async fn summary_parts_for_messages(
1842 &self,
1843 session_id: &str,
1844 message_ids: &[String],
1845 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1846 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1847 .await
1848 }
1849
1850 async fn scan_parts(
1851 &self,
1852 session_id: &str,
1853 message_ids: &[String],
1854 part_types: Option<&[&str]>,
1855 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1856 if message_ids.is_empty() {
1857 return Ok(BTreeMap::new());
1858 }
1859 let mut clauses = vec![
1860 Predicate::Eq("session_id", session_id.into()),
1861 in_predicate("message_id", message_ids),
1862 ];
1863 if let Some(types) = part_types {
1864 clauses.push(Predicate::In(
1865 "type",
1866 types.iter().map(|&t| t.into()).collect(),
1867 ));
1868 }
1869 let predicate = Predicate::And(clauses);
1870 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1871 let mut scanner = self
1872 .handle
1873 .scan(
1874 Table::Parts,
1875 ScanOpts::with_predicate_and_projection(
1876 &predicate,
1877 &[
1878 "session_id",
1879 "message_id",
1880 "id",
1881 "ordinal",
1882 "type",
1883 "provenance",
1884 "variant_data",
1885 "options",
1886 ],
1887 ),
1888 )
1889 .await?;
1890 scanner.with_row_address();
1891 let batch = scanner.try_into_batch().await.context("scan failed")?;
1892 let row_addresses = uint64(&batch, "_rowaddr")?;
1893 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1894 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1895 for row in 0..batch.num_rows() {
1896 if string(&batch, "type", row)?.as_deref() == Some("file") {
1897 let variant_data =
1898 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1899 file_rows.push((row, row_addresses.value(row), variant_data));
1900 }
1901 }
1902 if !file_rows.is_empty() {
1903 let addresses = file_rows
1904 .iter()
1905 .map(|(_, address, _)| *address)
1906 .collect::<Vec<_>>();
1907 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1908 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1909 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1913 file_payloads.insert(row, payload);
1914 }
1915 }
1916 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1917 for row in 0..batch.num_rows() {
1918 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1919 parts_by_message
1920 .entry((part.session_id.clone(), part.message_id.clone()))
1921 .or_default()
1922 .push(part);
1923 }
1924 for parts in parts_by_message.values_mut() {
1925 parts.sort_by_key(|part| part.ordinal);
1926 }
1927 Ok(parts_by_message)
1928 }
1929}
1930
1931#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1932#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1933pub enum IngestEvent {
1934 Session(Session),
1935 Message(Message),
1936 Part(Part),
1937}
1938
1939#[derive(Debug, Clone, PartialEq, Eq, Default)]
1947pub struct IngestSummary {
1948 pub inserted: usize,
1952 pub matched: usize,
1954 pub sessions_inserted: usize,
1956 pub messages_inserted_total: usize,
1959 pub messages_inserted_searchable: usize,
1963 pub parts_inserted: usize,
1965 pub sessions_matched: usize,
1967 pub messages_matched_total: usize,
1969 pub messages_matched_searchable: usize,
1971 pub parts_matched: usize,
1973 pub dropped_events: usize,
1983 pub dropped_sessions: usize,
1988 pub skipped_files: usize,
1991 pub skipped_empty: usize,
1996 pub skipped_fresh: usize,
2000 pub storage_errors: usize,
2004 pub truncated_values: usize,
2007 pub drop_reasons: BTreeMap<&'static str, usize>,
2013}
2014
2015pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2021pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2022pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2023pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2024pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2025pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2026pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2027pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2028pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2029pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2030pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2031
2032#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2040pub struct BatchCounts {
2041 pub sessions_inserted: usize,
2042 pub sessions_matched: usize,
2043 pub messages_inserted_total: usize,
2044 pub messages_inserted_searchable: usize,
2045 pub messages_matched_total: usize,
2046 pub messages_matched_searchable: usize,
2047 pub parts_inserted: usize,
2048 pub parts_matched: usize,
2049}
2050
2051impl IngestSummary {
2052 pub fn accepted(&self) -> usize {
2053 self.inserted + self.matched
2054 }
2055
2056 pub fn add_batch(&mut self, counts: &BatchCounts) {
2060 self.sessions_inserted += counts.sessions_inserted;
2061 self.sessions_matched += counts.sessions_matched;
2062 self.messages_inserted_total += counts.messages_inserted_total;
2063 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2064 self.messages_matched_total += counts.messages_matched_total;
2065 self.messages_matched_searchable += counts.messages_matched_searchable;
2066 self.parts_inserted += counts.parts_inserted;
2067 self.parts_matched += counts.parts_matched;
2068 self.inserted +=
2069 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2070 self.matched +=
2071 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2072 }
2073
2074 pub fn merge(&mut self, other: &Self) {
2078 self.inserted += other.inserted;
2079 self.matched += other.matched;
2080 self.sessions_inserted += other.sessions_inserted;
2081 self.messages_inserted_total += other.messages_inserted_total;
2082 self.messages_inserted_searchable += other.messages_inserted_searchable;
2083 self.parts_inserted += other.parts_inserted;
2084 self.sessions_matched += other.sessions_matched;
2085 self.messages_matched_total += other.messages_matched_total;
2086 self.messages_matched_searchable += other.messages_matched_searchable;
2087 self.parts_matched += other.parts_matched;
2088 self.dropped_events += other.dropped_events;
2089 self.dropped_sessions += other.dropped_sessions;
2090 self.skipped_files += other.skipped_files;
2091 self.skipped_empty += other.skipped_empty;
2092 self.skipped_fresh += other.skipped_fresh;
2093 self.storage_errors += other.storage_errors;
2094 self.truncated_values += other.truncated_values;
2095 for (key, value) in &other.drop_reasons {
2096 *self.drop_reasons.entry(key).or_insert(0) += value;
2097 }
2098 }
2099
2100 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2105 for outcome in outcomes {
2106 if !matches!(outcome.status, OutcomeStatus::Error) {
2107 continue;
2108 }
2109 if outcome.kind == "session" {
2110 self.dropped_sessions += 1;
2111 } else {
2112 self.dropped_events += 1;
2113 }
2114 let reason = outcome
2115 .error
2116 .as_ref()
2117 .and_then(|error| error.reason_key)
2118 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2119 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2120 }
2121 }
2122
2123 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2124 for outcome in outcomes {
2125 match outcome.status {
2126 OutcomeStatus::Inserted => {
2127 self.inserted += 1;
2128 match outcome.kind {
2129 "session" => self.sessions_inserted += 1,
2130 "message" => {
2131 self.messages_inserted_total += 1;
2132 if outcome.searchable {
2133 self.messages_inserted_searchable += 1;
2134 }
2135 }
2136 "part" => self.parts_inserted += 1,
2137 _ => {}
2138 }
2139 }
2140 OutcomeStatus::Matched => {
2141 self.matched += 1;
2142 match outcome.kind {
2143 "session" => self.sessions_matched += 1,
2144 "message" => {
2145 self.messages_matched_total += 1;
2146 if outcome.searchable {
2147 self.messages_matched_searchable += 1;
2148 }
2149 }
2150 "part" => self.parts_matched += 1,
2151 _ => {}
2152 }
2153 }
2154 OutcomeStatus::Error => {
2155 if outcome.kind == "session" {
2161 self.dropped_sessions += 1;
2162 } else {
2163 self.dropped_events += 1;
2164 }
2165 let reason = outcome
2166 .error
2167 .as_ref()
2168 .and_then(|e| e.reason_key)
2169 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2170 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2171 }
2172 }
2173 }
2174 }
2175}
2176
2177#[derive(Debug, Clone, PartialEq)]
2182pub struct RowOutcome {
2183 pub index: usize,
2184 pub kind: &'static str,
2185 pub pk: Value,
2186 pub status: OutcomeStatus,
2187 pub error: Option<RowError>,
2188 pub searchable: bool,
2193}
2194
2195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2196pub enum OutcomeStatus {
2197 Inserted,
2198 Matched,
2199 Error,
2200}
2201
2202#[derive(Debug, Clone, PartialEq, Eq)]
2205pub struct RowError {
2206 pub message: String,
2207 pub field: Option<&'static str>,
2208 pub reason: Option<&'static str>,
2209 pub reason_key: Option<&'static str>,
2214}
2215
2216#[derive(Debug)]
2220struct BufferedSession {
2221 index: usize,
2222 session: Session,
2223}
2224
2225#[derive(Debug)]
2226struct BufferedMessage {
2227 index: usize,
2228 message: Message,
2229 parts: Vec<BufferedPart>,
2230 search_text: Option<String>,
2231}
2232
2233#[derive(Debug)]
2234struct BufferedPart {
2235 index: usize,
2236 part: Part,
2237}
2238
2239#[derive(Debug, Default)]
2256pub struct IngestValidator {
2257 session: Option<BufferedSession>,
2258 current_message: Option<BufferedMessage>,
2259 current_parts: Vec<BufferedPart>,
2260 messages: Vec<BufferedMessage>,
2261 seen_message_ids: HashSet<String>,
2265 seen_part_keys: HashSet<(String, String)>,
2268 completed: Vec<CompletedSubstream>,
2272}
2273
2274#[derive(Debug)]
2276struct CompletedSubstream {
2277 session_index: usize,
2278 session: Session,
2279 messages: Vec<BufferedMessage>,
2280}
2281
2282fn ingest_host_stamp() -> Option<&'static Value> {
2287 static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2288 STAMP
2289 .get_or_init(|| {
2290 let mut host = serde_json::Map::new();
2291 if let Ok(username) = whoami::username() {
2292 host.insert("username".to_owned(), username.into());
2293 }
2294 if let Ok(hostname) = whoami::hostname() {
2295 host.insert("hostname".to_owned(), hostname.into());
2296 }
2297 if let Ok(devicename) = whoami::devicename() {
2298 host.insert("device_name".to_owned(), devicename.into());
2299 }
2300 (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2301 })
2302 .as_ref()
2303}
2304
2305impl IngestValidator {
2306 pub async fn push(
2312 &mut self,
2313 store: &Store,
2314 index: usize,
2315 event: IngestEvent,
2316 ) -> Result<Vec<RowOutcome>> {
2317 match event {
2318 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2319 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2320 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2321 }
2322 }
2323
2324 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2329 self.close_current_substream();
2330 self.flush(store).await
2331 }
2332
2333 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2340 if self.completed.is_empty() {
2341 return Ok((Vec::new(), BatchCounts::default()));
2342 }
2343 let completed = std::mem::take(&mut self.completed);
2344 store.upsert_session_batch(completed).await
2345 }
2346
2347 pub fn pending_substreams(&self) -> usize {
2350 self.completed.len()
2351 }
2352
2353 async fn push_session(
2354 &mut self,
2355 _store: &Store,
2356 index: usize,
2357 mut session: Session,
2358 ) -> Result<Vec<RowOutcome>> {
2359 self.close_current_substream();
2363
2364 let trimmed = session.source_agent.trim();
2369 if trimmed.is_empty() {
2370 return Ok(vec![RowOutcome {
2371 index,
2372 kind: "session",
2373 pk: Value::String(session.id.clone()),
2374 status: OutcomeStatus::Error,
2375 error: Some(RowError {
2376 message: format!("session {} has empty source_agent after trim", session.id),
2377 field: Some("source_agent"),
2378 reason: None,
2379 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2380 }),
2381 searchable: false,
2382 }]);
2383 }
2384 if trimmed.len() != session.source_agent.len() {
2385 session.source_agent = trimmed.to_owned();
2386 }
2387
2388 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2389 return Ok(vec![RowOutcome {
2390 index,
2391 kind: "session",
2392 pk: Value::String(session.id.clone()),
2393 status: OutcomeStatus::Error,
2394 error: Some(RowError {
2395 message: format!(
2396 "session {} has parent_message_id without parent_session_id",
2397 session.id,
2398 ),
2399 field: Some("parent_message_id"),
2400 reason: None,
2401 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2402 }),
2403 searchable: false,
2404 }]);
2405 }
2406
2407 self.seen_message_ids.clear();
2408 self.seen_part_keys.clear();
2409 self.session = Some(BufferedSession { index, session });
2410 Ok(Vec::new())
2411 }
2412
2413 fn close_current_substream(&mut self) {
2414 self.flush_current_message();
2415 let Some(BufferedSession {
2416 index: session_index,
2417 session,
2418 }) = self.session.take()
2419 else {
2420 return;
2421 };
2422 let messages = std::mem::take(&mut self.messages);
2423 self.seen_message_ids.clear();
2424 self.seen_part_keys.clear();
2425 self.completed.push(CompletedSubstream {
2426 session_index,
2427 session,
2428 messages,
2429 });
2430 }
2431
2432 fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2433 let pk = Value::Array(vec![
2434 Value::String(message.session_id().to_owned()),
2435 Value::String(message.id().to_owned()),
2436 ]);
2437 let Some(session) = &self.session else {
2438 return vec![error_outcome(
2439 index,
2440 "message",
2441 pk,
2442 "first event in a session stream must be Session",
2443 None,
2444 DROP_REASON_MESSAGE_BEFORE_SESSION,
2445 )];
2446 };
2447 if message.session_id() != session.session.id {
2448 let msg = format!(
2449 "message {} references session {}, expected {}",
2450 message.id(),
2451 message.session_id(),
2452 session.session.id
2453 );
2454 return vec![error_outcome(
2455 index,
2456 "message",
2457 pk,
2458 &msg,
2459 Some("session_id"),
2460 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2461 )];
2462 }
2463 if !self.seen_message_ids.insert(message.id().to_owned()) {
2464 let msg = format!("duplicate message id {} in session substream", message.id());
2468 return vec![error_outcome(
2469 index,
2470 "message",
2471 pk,
2472 &msg,
2473 None,
2474 DROP_REASON_DUPLICATE_MESSAGE_ID,
2475 )];
2476 }
2477 match ingest_host_stamp() {
2482 Some(stamp) => {
2483 message
2484 .options_mut()
2485 .insert("pond".to_owned(), stamp.clone());
2486 }
2487 None => {
2488 message.options_mut().remove("pond");
2489 }
2490 }
2491 self.flush_current_message();
2492 self.current_message = Some(BufferedMessage {
2493 index,
2494 message,
2495 parts: Vec::new(),
2496 search_text: None,
2497 });
2498 Vec::new()
2499 }
2500
2501 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2502 let pk = Value::Array(vec![
2503 Value::String(part.session_id.clone()),
2504 Value::String(part.message_id.clone()),
2505 Value::String(part.id.clone()),
2506 ]);
2507 let Some(current) = &self.current_message else {
2508 return vec![error_outcome(
2509 index,
2510 "part",
2511 pk,
2512 "part event appeared before a message",
2513 None,
2514 DROP_REASON_PART_BEFORE_MESSAGE,
2515 )];
2516 };
2517 if part.session_id != current.message.session_id() {
2518 let msg = format!(
2519 "part {} references session {}, expected {}",
2520 part.id,
2521 part.session_id,
2522 current.message.session_id()
2523 );
2524 return vec![error_outcome(
2525 index,
2526 "part",
2527 pk,
2528 &msg,
2529 Some("session_id"),
2530 DROP_REASON_PART_MESSAGE_MISMATCH,
2531 )];
2532 }
2533 if part.message_id != current.message.id() {
2534 let msg = format!(
2535 "part {} references message {}, expected {}",
2536 part.id,
2537 part.message_id,
2538 current.message.id()
2539 );
2540 return vec![error_outcome(
2541 index,
2542 "part",
2543 pk,
2544 &msg,
2545 Some("message_id"),
2546 DROP_REASON_PART_MESSAGE_MISMATCH,
2547 )];
2548 }
2549 let part_key = (part.message_id.clone(), part.id.clone());
2550 if !self.seen_part_keys.insert(part_key) {
2551 let msg = format!(
2552 "duplicate part id {} for message {} in session substream",
2553 part.id, part.message_id
2554 );
2555 return vec![error_outcome(
2556 index,
2557 "part",
2558 pk,
2559 &msg,
2560 None,
2561 DROP_REASON_DUPLICATE_PART_KEY,
2562 )];
2563 }
2564 self.current_parts.push(BufferedPart { index, part });
2565 Vec::new()
2566 }
2567
2568 fn flush_current_message(&mut self) {
2569 let Some(mut buffered) = self.current_message.take() else {
2570 return;
2571 };
2572 let parts = std::mem::take(&mut self.current_parts);
2573 let mut canonical_parts = Vec::with_capacity(parts.len());
2574 for part in &parts {
2575 canonical_parts.push(part.part.clone());
2576 }
2577 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2578 buffered.parts = parts;
2579 self.messages.push(buffered);
2580 }
2581}
2582
2583fn error_outcome(
2584 index: usize,
2585 kind: &'static str,
2586 pk: Value,
2587 message: &str,
2588 field: Option<&'static str>,
2589 reason_key: &'static str,
2590) -> RowOutcome {
2591 RowOutcome {
2592 index,
2593 kind,
2594 pk,
2595 status: OutcomeStatus::Error,
2596 error: Some(RowError {
2597 message: message.to_owned(),
2598 field,
2599 reason: None,
2600 reason_key: Some(reason_key),
2601 }),
2602 searchable: false,
2603 }
2604}
2605
2606fn error_outcomes_for_substream(
2611 session_index: usize,
2612 session: &Session,
2613 _messages: &[BufferedMessage],
2614 message: impl Into<String>,
2615 field: Option<&'static str>,
2616 reason_key: &'static str,
2617) -> Vec<RowOutcome> {
2618 let reason = field.map(|_| "immutable");
2619 vec![RowOutcome {
2620 index: session_index,
2621 kind: "session",
2622 pk: Value::String(session.id.clone()),
2623 status: OutcomeStatus::Error,
2624 error: Some(RowError {
2625 message: message.into(),
2626 field,
2627 reason,
2628 reason_key: Some(reason_key),
2629 }),
2630 searchable: false,
2631 }]
2632}
2633
2634fn success_outcomes_for_substream(
2640 session_index: usize,
2641 session: &Session,
2642 messages: &[BufferedMessage],
2643 existing_sessions: &std::collections::HashMap<String, Session>,
2644 existing_message_pks: &HashSet<(String, String)>,
2645 existing_part_pks: &HashSet<(String, String, String)>,
2646 counts: &mut BatchCounts,
2647) -> Vec<RowOutcome> {
2648 let session_was_present = existing_sessions.contains_key(&session.id);
2649 let session_status = if session_was_present {
2650 counts.sessions_matched += 1;
2651 UpsertStatus::Matched
2652 } else {
2653 counts.sessions_inserted += 1;
2654 UpsertStatus::Inserted
2655 };
2656
2657 let mut outcomes = Vec::with_capacity(1 + messages.len());
2658 outcomes.push(success_outcome(
2659 session_index,
2660 "session",
2661 Value::String(session.id.clone()),
2662 session_status,
2663 false,
2664 ));
2665 for buffered in messages {
2666 let key = (
2667 buffered.message.session_id().to_owned(),
2668 buffered.message.id().to_owned(),
2669 );
2670 let searchable = buffered.search_text.is_some();
2671 let message_status = if existing_message_pks.contains(&key) {
2672 counts.messages_matched_total += 1;
2673 if searchable {
2674 counts.messages_matched_searchable += 1;
2675 }
2676 UpsertStatus::Matched
2677 } else {
2678 counts.messages_inserted_total += 1;
2679 if searchable {
2680 counts.messages_inserted_searchable += 1;
2681 }
2682 UpsertStatus::Inserted
2683 };
2684 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2685 outcomes.push(success_outcome(
2686 buffered.index,
2687 "message",
2688 pk,
2689 message_status,
2690 searchable,
2691 ));
2692 for part in &buffered.parts {
2693 let part_key = (
2694 part.part.session_id.clone(),
2695 part.part.message_id.clone(),
2696 part.part.id.clone(),
2697 );
2698 let part_status = if existing_part_pks.contains(&part_key) {
2699 counts.parts_matched += 1;
2700 UpsertStatus::Matched
2701 } else {
2702 counts.parts_inserted += 1;
2703 UpsertStatus::Inserted
2704 };
2705 let part_pk = Value::Array(vec![
2706 Value::String(part_key.0),
2707 Value::String(part_key.1),
2708 Value::String(part_key.2),
2709 ]);
2710 outcomes.push(success_outcome(
2711 part.index,
2712 "part",
2713 part_pk,
2714 part_status,
2715 false,
2716 ));
2717 }
2718 }
2719 outcomes
2720}
2721
2722fn success_outcome(
2723 index: usize,
2724 kind: &'static str,
2725 pk: Value,
2726 status: UpsertStatus,
2727 searchable: bool,
2728) -> RowOutcome {
2729 let status = match status {
2730 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2731 UpsertStatus::Matched => OutcomeStatus::Matched,
2732 };
2733 RowOutcome {
2734 index,
2735 kind,
2736 pk,
2737 status,
2738 error: None,
2739 searchable,
2740 }
2741}
2742
2743#[derive(Debug, Clone, PartialEq, Eq)]
2744enum IngestError {
2745 ImmutableField {
2750 field: &'static str,
2751 session_id: String,
2752 stored: String,
2753 attempted: String,
2754 },
2755}
2756
2757impl std::fmt::Display for IngestError {
2758 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2759 match self {
2760 Self::ImmutableField {
2761 field,
2762 session_id,
2763 stored,
2764 attempted,
2765 } => write!(
2766 formatter,
2767 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2768 ),
2769 }
2770 }
2771}
2772
2773impl std::error::Error for IngestError {}
2774
2775fn ensure_immutable_match(
2779 existing: &Session,
2780 incoming: &Session,
2781) -> std::result::Result<(), IngestError> {
2782 if existing.source_agent != incoming.source_agent {
2783 return Err(IngestError::ImmutableField {
2784 field: "source_agent",
2785 session_id: incoming.id.clone(),
2786 stored: existing.source_agent.clone(),
2787 attempted: incoming.source_agent.clone(),
2788 });
2789 }
2790 if existing.project != incoming.project {
2791 return Err(IngestError::ImmutableField {
2792 field: "project",
2793 session_id: incoming.id.clone(),
2794 stored: (*existing.project).clone(),
2795 attempted: (*incoming.project).clone(),
2796 });
2797 }
2798 Ok(())
2799}
2800
2801pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2802 use crate::wire::Provenance;
2803 let mut chunks: Vec<String> = Vec::new();
2804 for part in parts {
2805 if part.provenance != Provenance::Conversational {
2808 continue;
2809 }
2810 match (message.role(), &part.kind) {
2811 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2812 if let Some(text) = text {
2813 chunks.push(text.to_string());
2814 }
2815 }
2816 (
2817 Role::User | Role::Assistant,
2818 PartKind::File {
2819 media_type,
2820 file_name,
2821 data,
2822 },
2823 ) => {
2824 if let Some(file_name) = file_name {
2825 chunks.push(file_name.clone());
2826 }
2827 if let Some(media_type) = media_type {
2828 chunks.push(media_type.clone());
2829 }
2830 if let FileData::Url(uri) = data {
2831 chunks.push(uri.clone());
2832 }
2833 }
2834 (
2835 Role::System | Role::Tool,
2836 PartKind::Text { .. }
2837 | PartKind::Reasoning { .. }
2838 | PartKind::File { .. }
2839 | PartKind::ToolCall { .. }
2840 | PartKind::ToolResult { .. }
2841 | PartKind::ToolApprovalRequest { .. }
2842 | PartKind::ToolApprovalResponse { .. },
2843 )
2844 | (
2845 Role::User | Role::Assistant,
2846 PartKind::Reasoning { .. }
2847 | PartKind::ToolCall { .. }
2848 | PartKind::ToolResult { .. }
2849 | PartKind::ToolApprovalRequest { .. }
2850 | PartKind::ToolApprovalResponse { .. },
2851 ) => {}
2852 }
2853 }
2854
2855 let text = chunks
2856 .into_iter()
2857 .filter(|chunk| !chunk.trim().is_empty())
2858 .collect::<Vec<_>>()
2859 .join("\n");
2860 if text.is_empty() { None } else { Some(text) }
2861}
2862
2863#[derive(Debug, Clone, PartialEq, Eq)]
2865pub struct SearchText(String);
2866
2867impl SearchText {
2868 pub fn as_str(&self) -> &str {
2869 &self.0
2870 }
2871
2872 pub fn into_inner(self) -> String {
2873 self.0
2874 }
2875}
2876
2877impl AsRef<str> for SearchText {
2878 fn as_ref(&self) -> &str {
2879 &self.0
2880 }
2881}
2882
2883#[derive(Debug, Clone, PartialEq)]
2884pub struct MessageWithParts {
2885 pub message: Message,
2886 pub parts: Vec<Part>,
2887}
2888
2889#[derive(Debug, Clone, PartialEq)]
2890pub struct SessionWithMessages {
2891 pub session: Session,
2892 pub messages: Vec<MessageWithParts>,
2893}
2894
2895#[derive(Debug, Clone)]
2896pub struct SessionViewParams<'a> {
2897 pub mode: ResponseMode,
2898 pub after_id: Option<&'a str>,
2899 pub limit: usize,
2900 pub budget_bytes: usize,
2901 pub session_from: SessionFrom,
2902}
2903
2904#[derive(Debug, Clone)]
2905pub struct MessageViewParams<'a> {
2906 pub context_depth: usize,
2907 pub mode: ResponseMode,
2911 pub after_id: Option<&'a str>,
2912 pub limit: usize,
2913 pub budget_bytes: usize,
2914}
2915
2916#[derive(Debug, Clone, PartialEq)]
2922pub enum GetLookup<T> {
2923 NotFound,
2924 UnknownAfterId,
2925 Found(T),
2926}
2927
2928#[derive(Debug, Clone, PartialEq)]
2932pub struct SessionPage {
2933 pub session: Session,
2934 pub messages: Vec<RetrievedMessage>,
2935 pub messages_remaining: usize,
2936}
2937
2938#[derive(Debug, Clone, PartialEq)]
2942pub struct MessagePage {
2943 pub session: Session,
2944 pub target: RetrievedMessage,
2945 pub target_parts: Vec<Part>,
2946 pub target_parts_remaining: usize,
2947 pub siblings: Vec<RetrievedMessage>,
2948}
2949
2950#[derive(Debug, Clone, PartialEq)]
2951pub struct RetrievedMessage {
2952 pub id: String,
2953 pub role: Role,
2954 pub timestamp: DateTime<Utc>,
2955 pub text: Option<String>,
2956 pub content: Option<String>,
2957 pub parts: Vec<Part>,
2958}
2959
2960#[derive(Debug, Clone)]
2961struct ScanRow {
2962 id: String,
2963 role: Role,
2964 timestamp: DateTime<Utc>,
2965 text: Option<String>,
2966 content: Option<String>,
2967}
2968
2969#[derive(Debug, Clone)]
2972pub struct ConversationalRow {
2973 pub session_id: String,
2974 pub message_id: String,
2975 pub role: Role,
2976 pub timestamp: DateTime<Utc>,
2977 pub text: SearchText,
2978}
2979
2980fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2985 let capped = items.len().min(limit.clamp(1, 1000));
2986 let mut acc = 0usize;
2987 let mut emitted = 0usize;
2988 for item in &items[..capped] {
2989 let next = acc.saturating_add(size(item));
2990 if emitted > 0 && next > budget_bytes {
2991 break;
2992 }
2993 acc = next;
2994 emitted += 1;
2995 }
2996 emitted
2997}
2998
2999fn role_from_str(value: &str) -> Result<Role> {
3000 match value {
3001 "system" => Ok(Role::System),
3002 "user" => Ok(Role::User),
3003 "assistant" => Ok(Role::Assistant),
3004 "tool" => Ok(Role::Tool),
3005 other => anyhow::bail!("unknown message role {other}"),
3006 }
3007}
3008
3009const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3017 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
3018 (
3019 "session_id",
3020 BuiltinIndexType::BTree,
3021 "messages_session_id_btree",
3022 ),
3023 (
3024 "timestamp",
3025 BuiltinIndexType::BTree,
3026 "messages_timestamp_btree",
3027 ),
3028 (
3029 "source_agent",
3030 BuiltinIndexType::Bitmap,
3031 "messages_source_agent_bitmap",
3032 ),
3033 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
3034];
3035
3036const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3039 (
3040 "session_id",
3041 BuiltinIndexType::BTree,
3042 "parts_session_id_btree",
3043 ),
3044 (
3045 "message_id",
3046 BuiltinIndexType::BTree,
3047 "parts_message_id_btree",
3048 ),
3049];
3050
3051const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3054 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3055
3056fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3057 Predicate::In(
3058 column,
3059 values.iter().cloned().map(ScalarValue::String).collect(),
3060 )
3061}
3062
3063fn embedded_scope(filter: &Predicate) -> Predicate {
3068 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3069}
3070
3071fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3072 let inserted = usize::try_from(inserted_rows)
3073 .unwrap_or(usize::MAX)
3074 .min(total);
3075 let mut statuses = Vec::with_capacity(total);
3076 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3077 statuses.extend(std::iter::repeat_n(
3078 UpsertStatus::Matched,
3079 total.saturating_sub(inserted),
3080 ));
3081 statuses
3082}
3083
3084pub(crate) const SESSIONS: &str = "sessions";
3088pub(crate) const MESSAGES: &str = "messages";
3089pub(crate) const PARTS: &str = "parts";
3090
3091pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3094
3095pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3098
3099const IVF_PQ_NUM_BITS: u8 = 8;
3105const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3106const IVF_PQ_MAX_ITERS: usize = 15;
3107
3108const FTS_NGRAM_MIN: u32 = 3;
3112const FTS_NGRAM_MAX: u32 = 5;
3113
3114pub fn pond_index_intents() -> IndexIntents {
3117 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3118}
3119
3120pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3124 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3125 messages.push(IndexIntent {
3126 name: MESSAGES_FTS_INDEX,
3127 column: "search_text",
3128 trigger: IndexTrigger::OnAnyRows,
3129 params: IndexParamsKind::InvertedFtsNgram {
3130 min: FTS_NGRAM_MIN,
3131 max: FTS_NGRAM_MAX,
3132 },
3133 });
3134 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3135 messages.push(IndexIntent {
3136 name,
3137 column,
3138 trigger: IndexTrigger::OnAnyRows,
3139 params: IndexParamsKind::Scalar(kind.clone()),
3140 });
3141 }
3142 messages.push(IndexIntent {
3143 name: MESSAGES_VECTOR_INDEX,
3144 column: "vector",
3145 trigger: IndexTrigger::OnNonNullCount {
3146 column: "vector",
3147 threshold: vector_threshold,
3148 },
3149 params: IndexParamsKind::IvfPqCosine {
3150 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3151 num_bits: IVF_PQ_NUM_BITS,
3152 max_iters: IVF_PQ_MAX_ITERS,
3153 },
3154 });
3155 let parts = PARTS_SCALAR_INDICES
3156 .iter()
3157 .map(|(column, kind, name)| IndexIntent {
3158 name,
3159 column,
3160 trigger: IndexTrigger::OnAnyRows,
3161 params: IndexParamsKind::Scalar(kind.clone()),
3162 })
3163 .collect();
3164 let sessions = SESSIONS_SCALAR_INDICES
3165 .iter()
3166 .map(|(column, kind, name)| IndexIntent {
3167 name,
3168 column,
3169 trigger: IndexTrigger::OnAnyRows,
3170 params: IndexParamsKind::Scalar(kind.clone()),
3171 })
3172 .collect();
3173 IndexIntents {
3174 sessions,
3175 messages,
3176 parts,
3177 }
3178}
3179
3180pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3184
3185static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3191
3192pub fn embedding_dim() -> usize {
3195 EMBEDDING_DIM_RUNTIME
3196 .get()
3197 .copied()
3198 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3199}
3200
3201pub fn init_embedding_dim(dim: usize) {
3203 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3204}
3205
3206pub(crate) fn write_params_for_create() -> WriteParams {
3213 WriteParams {
3214 data_storage_version: Some(LanceFileVersion::V2_1),
3215 enable_v2_manifest_paths: true,
3216 enable_stable_row_ids: true,
3217 auto_cleanup: Some(AutoCleanupParams {
3218 interval: 20,
3219 older_than: chrono::TimeDelta::days(1),
3220 }),
3221 skip_auto_cleanup: true,
3222 ..WriteParams::default()
3223 }
3224}
3225
3226fn export_schema(table: Table) -> Arc<Schema> {
3227 match table {
3228 Table::Sessions => session_schema(),
3229 Table::Messages => message_schema(),
3230 Table::Parts => part_schema(),
3231 }
3232}
3233
3234fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3235 let expected = export_schema(table);
3236 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3237 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3238 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3239 if actual_names != expected_names {
3240 anyhow::bail!(
3241 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3242 table.as_str(),
3243 );
3244 }
3245 Ok(())
3246}
3247
3248async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3249 let source_uri = source
3250 .to_str()
3251 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3252 let dataset = Dataset::open(source_uri)
3253 .await
3254 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3255 ensure_schema_matches_archive(&dataset, table)?;
3256 Ok(dataset)
3257}
3258
3259pub(crate) fn session_schema() -> Arc<Schema> {
3260 Arc::new(Schema::new(vec![
3261 primary_field("id", DataType::Utf8, false),
3262 Field::new("parent_session_id", DataType::Utf8, true),
3263 Field::new("parent_message_id", DataType::Utf8, true),
3264 Field::new("source_agent", DataType::Utf8, false),
3265 Field::new(
3266 "created_at",
3267 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3268 false,
3269 ),
3270 Field::new("project", DataType::Utf8, false),
3271 json_field("options", false),
3272 ]))
3273}
3274
3275pub(crate) fn message_schema() -> Arc<Schema> {
3276 Arc::new(Schema::new(vec![
3277 primary_field("session_id", DataType::Utf8, false),
3278 primary_field("id", DataType::Utf8, false),
3279 Field::new(
3280 "timestamp",
3281 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3282 false,
3283 ),
3284 Field::new("role", DataType::Utf8, false),
3285 Field::new("source_agent", DataType::Utf8, false),
3286 Field::new("project", DataType::Utf8, false),
3287 Field::new("content", DataType::Utf8, true),
3288 Field::new("search_text", DataType::Utf8, true),
3289 Field::new("vector", embedding_vector_type(), true),
3292 Field::new("embedding_model", DataType::Utf8, true),
3293 json_field("options", false),
3294 ]))
3295}
3296
3297pub(crate) fn part_schema() -> Arc<Schema> {
3298 Arc::new(Schema::new(vec![
3299 primary_field("session_id", DataType::Utf8, false),
3300 primary_field("message_id", DataType::Utf8, false),
3301 primary_field("id", DataType::Utf8, false),
3302 Field::new("ordinal", DataType::Int32, false),
3303 Field::new("type", DataType::Utf8, false),
3304 Field::new("provenance", DataType::Utf8, false),
3307 json_field("variant_data", false),
3308 legacy_blob_field("data", true),
3309 json_field("options", false),
3310 ]))
3311}
3312
3313pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3314 let arrays = schema
3315 .fields()
3316 .iter()
3317 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3318 .collect();
3319 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3320}
3321
3322pub(crate) fn empty_reader(
3323 schema: Arc<Schema>,
3324) -> Result<
3325 RecordBatchIterator<
3326 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3327 >,
3328> {
3329 let batch = empty_batch(schema.clone())?;
3330 Ok(RecordBatchIterator::new(
3331 vec![Ok(batch)].into_iter(),
3332 schema,
3333 ))
3334}
3335
3336pub(crate) struct MessageBatchRow<'a> {
3337 pub message: &'a Message,
3338 pub source_agent: &'a str,
3339 pub project: &'a str,
3340 pub search_text: Option<&'a str>,
3341}
3342
3343fn embedding_vector_type() -> DataType {
3349 DataType::FixedSizeList(
3350 Arc::new(Field::new("item", DataType::Float16, true)),
3351 embedding_dim() as i32,
3352 )
3353}
3354
3355fn embedding_update_schema() -> Arc<Schema> {
3359 Arc::new(Schema::new(vec![
3360 primary_field("session_id", DataType::Utf8, false),
3361 primary_field("id", DataType::Utf8, false),
3362 Field::new("vector", embedding_vector_type(), true),
3363 Field::new("embedding_model", DataType::Utf8, true),
3364 ]))
3365}
3366
3367pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3370 let dim = embedding_dim();
3371 let mut flat = Vec::with_capacity(rows.len() * dim);
3372 for row in rows {
3373 if row.vector.len() != dim {
3374 anyhow::bail!(
3375 "embedding for message {} has dim {}, expected {dim}",
3376 row.id,
3377 row.vector.len(),
3378 );
3379 }
3380 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3381 }
3382 let values = Float16Array::from(flat);
3383 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3384 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3385 .context("failed to build embedding vector column")?;
3386
3387 RecordBatch::try_new(
3388 embedding_update_schema(),
3389 vec![
3390 Arc::new(StringArray::from(
3391 rows.iter()
3392 .map(|row| row.session_id.as_str())
3393 .collect::<Vec<_>>(),
3394 )),
3395 Arc::new(StringArray::from(
3396 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3397 )),
3398 Arc::new(vectors),
3399 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3400 ],
3401 )
3402 .context("failed to build embedding update batch")
3403}
3404
3405const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3410
3411fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3416 let mut chunks = Vec::new();
3417 let mut start = 0usize;
3418 let mut running = 0usize;
3419 for (index, &row) in cells.iter().enumerate() {
3420 if running + row > COLUMN_BYTE_BUDGET && index > start {
3421 chunks.push(start..index);
3422 start = index;
3423 running = 0;
3424 }
3425 running += row;
3426 }
3427 if start < cells.len() {
3428 chunks.push(start..cells.len());
3429 }
3430 chunks
3431}
3432
3433fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3434 if bytes >= COLUMN_BYTE_BUDGET {
3435 anyhow::bail!(
3436 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3437 overflow Arrow's i32 offset buffer"
3438 );
3439 }
3440 Ok(())
3441}
3442
3443async fn merge_insert_chunks(
3444 handle: &Handle,
3445 table: Table,
3446 batches: Vec<RecordBatch>,
3447) -> Result<u64> {
3448 let mut inserted = 0u64;
3449 for batch in batches {
3450 let rows = batch.num_rows();
3451 inserted += handle.merge_insert(table, batch, rows).await?;
3452 }
3453 Ok(inserted)
3454}
3455
3456pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3457 let options = sessions
3458 .iter()
3459 .map(|session| json_bytes(&session.options))
3460 .collect::<Result<Vec<_>>>()?;
3461 let mut cells = Vec::with_capacity(sessions.len());
3462 for (session, encoded) in sessions.iter().zip(&options) {
3463 let columns = [
3464 session.id.len(),
3465 session.parent_session_id.as_deref().map_or(0, str::len),
3466 session.parent_message_id.as_deref().map_or(0, str::len),
3467 session.source_agent.len(),
3468 session.project.as_str().len(),
3469 encoded.len(),
3470 ];
3471 for bytes in columns {
3472 guard_cell("sessions", &session.id, bytes)?;
3473 }
3474 cells.push(columns.iter().sum());
3475 }
3476 chunk_ranges(&cells)
3477 .into_iter()
3478 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3479 .collect()
3480}
3481
3482fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3483 let schema = session_schema();
3484 RecordBatch::try_new(
3485 schema.clone(),
3486 vec![
3487 Arc::new(StringArray::from(
3488 sessions
3489 .iter()
3490 .map(|session| session.id.as_str())
3491 .collect::<Vec<_>>(),
3492 )),
3493 Arc::new(StringArray::from(
3494 sessions
3495 .iter()
3496 .map(|session| session.parent_session_id.as_deref())
3497 .collect::<Vec<_>>(),
3498 )),
3499 Arc::new(StringArray::from(
3500 sessions
3501 .iter()
3502 .map(|session| session.parent_message_id.as_deref())
3503 .collect::<Vec<_>>(),
3504 )),
3505 Arc::new(StringArray::from(
3506 sessions
3507 .iter()
3508 .map(|session| session.source_agent.as_str())
3509 .collect::<Vec<_>>(),
3510 )),
3511 Arc::new(
3512 TimestampMicrosecondArray::from(
3513 sessions
3514 .iter()
3515 .map(|session| micros(session.created_at))
3516 .collect::<Vec<_>>(),
3517 )
3518 .with_timezone("UTC"),
3519 ),
3520 Arc::new(StringArray::from(
3521 sessions
3522 .iter()
3523 .map(|session| session.project.as_str())
3524 .collect::<Vec<_>>(),
3525 )),
3526 Arc::new(LargeBinaryArray::from_iter_values(
3527 options.iter().map(Vec::as_slice),
3528 )),
3529 ],
3530 )
3531 .context("failed to build session batch")
3532}
3533
3534pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3535 let options = rows
3536 .iter()
3537 .map(|row| json_bytes(row.message.options()))
3538 .collect::<Result<Vec<_>>>()?;
3539 let mut cells = Vec::with_capacity(rows.len());
3540 for (row, encoded) in rows.iter().zip(&options) {
3541 let columns = [
3542 row.message.session_id().len(),
3543 row.message.id().len(),
3544 row.message.role().as_str().len(),
3545 row.source_agent.len(),
3546 row.project.len(),
3547 row.message.system_content().map_or(0, str::len),
3548 row.search_text.map_or(0, str::len),
3549 encoded.len(),
3550 ];
3551 for bytes in columns {
3552 guard_cell("messages", row.message.id(), bytes)?;
3553 }
3554 cells.push(columns.iter().sum());
3555 }
3556 chunk_ranges(&cells)
3557 .into_iter()
3558 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3559 .collect()
3560}
3561
3562fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3563 let schema = message_schema();
3564 RecordBatch::try_new(
3565 schema.clone(),
3566 vec![
3567 Arc::new(StringArray::from(
3568 rows.iter()
3569 .map(|row| row.message.session_id())
3570 .collect::<Vec<_>>(),
3571 )),
3572 Arc::new(StringArray::from(
3573 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3574 )),
3575 Arc::new(
3576 TimestampMicrosecondArray::from(
3577 rows.iter()
3578 .map(|row| micros(row.message.timestamp()))
3579 .collect::<Vec<_>>(),
3580 )
3581 .with_timezone("UTC"),
3582 ),
3583 Arc::new(StringArray::from(
3584 rows.iter()
3585 .map(|row| row.message.role().as_str())
3586 .collect::<Vec<_>>(),
3587 )),
3588 Arc::new(StringArray::from(
3589 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3590 )),
3591 Arc::new(StringArray::from(
3592 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3593 )),
3594 Arc::new(StringArray::from(
3595 rows.iter()
3596 .map(|row| row.message.system_content())
3597 .collect::<Vec<_>>(),
3598 )),
3599 Arc::new(StringArray::from(
3600 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3601 )),
3602 new_null_array(&embedding_vector_type(), rows.len()),
3606 new_null_array(&DataType::Utf8, rows.len()),
3607 Arc::new(LargeBinaryArray::from_iter_values(
3608 options.iter().map(Vec::as_slice),
3609 )),
3610 ],
3611 )
3612 .context("failed to build message batch")
3613}
3614
3615pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3616 let variant_data = parts
3617 .iter()
3618 .map(|part| part_variant_json(&part.kind))
3619 .collect::<Result<Vec<_>>>()?;
3620 let options = parts
3621 .iter()
3622 .map(|part| json_bytes(&part.options))
3623 .collect::<Result<Vec<_>>>()?;
3624 let mut cells = Vec::with_capacity(parts.len());
3625 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3628 let columns = [
3629 part.session_id.len(),
3630 part.message_id.len(),
3631 part.id.len(),
3632 part.kind.type_name().len(),
3633 part.provenance.as_str().len(),
3634 variant.len(),
3635 encoded.len(),
3636 ];
3637 for bytes in columns {
3638 guard_cell("parts", &part.id, bytes)?;
3639 }
3640 cells.push(columns.iter().sum());
3641 }
3642 chunk_ranges(&cells)
3643 .into_iter()
3644 .map(|range| {
3645 parts_chunk(
3646 &parts[range.clone()],
3647 &variant_data[range.clone()],
3648 &options[range],
3649 )
3650 })
3651 .collect()
3652}
3653
3654fn parts_chunk(
3655 parts: &[Part],
3656 variant_data: &[Vec<u8>],
3657 options: &[Vec<u8>],
3658) -> Result<RecordBatch> {
3659 let schema = part_schema();
3660 let blob_payloads: Vec<Option<&[u8]>> = parts
3664 .iter()
3665 .map(|part| match &part.kind {
3666 PartKind::File { data, .. } => Some(match data {
3667 FileData::String(value) => value.as_bytes(),
3668 FileData::Bytes(value) => value.as_slice(),
3669 FileData::Url(value) => value.as_bytes(),
3670 }),
3671 PartKind::Text { .. }
3672 | PartKind::Reasoning { .. }
3673 | PartKind::ToolCall { .. }
3674 | PartKind::ToolResult { .. }
3675 | PartKind::ToolApprovalRequest { .. }
3676 | PartKind::ToolApprovalResponse { .. } => None,
3677 })
3678 .collect();
3679 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3680
3681 RecordBatch::try_new(
3682 schema.clone(),
3683 vec![
3684 Arc::new(StringArray::from(
3685 parts
3686 .iter()
3687 .map(|part| part.session_id.as_str())
3688 .collect::<Vec<_>>(),
3689 )),
3690 Arc::new(StringArray::from(
3691 parts
3692 .iter()
3693 .map(|part| part.message_id.as_str())
3694 .collect::<Vec<_>>(),
3695 )),
3696 Arc::new(StringArray::from(
3697 parts
3698 .iter()
3699 .map(|part| part.id.as_str())
3700 .collect::<Vec<_>>(),
3701 )),
3702 Arc::new(Int32Array::from(
3703 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3704 )),
3705 Arc::new(StringArray::from(
3706 parts
3707 .iter()
3708 .map(|part| part.kind.type_name())
3709 .collect::<Vec<_>>(),
3710 )),
3711 Arc::new(StringArray::from(
3712 parts
3713 .iter()
3714 .map(|part| part.provenance.as_str())
3715 .collect::<Vec<_>>(),
3716 )),
3717 Arc::new(LargeBinaryArray::from_iter_values(
3718 variant_data.iter().map(Vec::as_slice),
3719 )),
3720 Arc::new(blob_array),
3721 Arc::new(LargeBinaryArray::from_iter_values(
3722 options.iter().map(Vec::as_slice),
3723 )),
3724 ],
3725 )
3726 .context("failed to build parts batch")
3727}
3728
3729pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3730 Ok(Session {
3731 id: string(batch, "id", row)?.context("session id is null")?,
3732 parent_session_id: string(batch, "parent_session_id", row)?,
3733 parent_message_id: string(batch, "parent_message_id", row)?,
3734 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3735 created_at: datetime(batch, "created_at", row)?,
3736 project: crate::adapter::Extracted::from_stored(
3737 string(batch, "project", row)?.context("project is null")?,
3738 ),
3739 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3740 })
3741}
3742
3743pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3744 let id = string(batch, "id", row)?.context("message id is null")?;
3745 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3746 let timestamp = datetime(batch, "timestamp", row)?;
3747 let options =
3748 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3749
3750 match string(batch, "role", row)?
3751 .context("message role is null")?
3752 .as_str()
3753 {
3754 "system" => Ok(Message::System {
3755 id,
3756 session_id,
3757 timestamp,
3758 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3765 options,
3766 }),
3767 "user" => Ok(Message::User {
3768 id,
3769 session_id,
3770 timestamp,
3771 options,
3772 }),
3773 "assistant" => Ok(Message::Assistant {
3774 id,
3775 session_id,
3776 timestamp,
3777 options,
3778 }),
3779 "tool" => Ok(Message::Tool {
3780 id,
3781 session_id,
3782 timestamp,
3783 options,
3784 }),
3785 other => anyhow::bail!("unknown message role {other}"),
3786 }
3787}
3788
3789pub(crate) fn part_from_batch(
3790 batch: &RecordBatch,
3791 row: usize,
3792 file_data: Option<FileData>,
3793) -> Result<Part> {
3794 let type_name = string(batch, "type", row)?.context("part type is null")?;
3795 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3796 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3797 Ok(Part {
3798 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3799 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3800 id: string(batch, "id", row)?.context("part id is null")?,
3801 ordinal: int32(batch, "ordinal", row)?,
3802 provenance: provenance_from_str(&provenance)?,
3803 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3804 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3805 })
3806}
3807
3808fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3809 match value {
3810 "conversational" => Ok(crate::wire::Provenance::Conversational),
3811 "injected" => Ok(crate::wire::Provenance::Injected),
3812 other => anyhow::bail!("unknown part provenance {other}"),
3813 }
3814}
3815
3816fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3817 let kind = file_data_kind(variant_data)?;
3818 match kind.as_str() {
3819 "string" => {
3820 let text = std::str::from_utf8(bytes)
3821 .context("file string payload is not UTF-8")?
3822 .to_owned();
3823 Ok(FileData::String(text))
3824 }
3825 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3826 "url" => Ok(FileData::Url(
3827 std::str::from_utf8(bytes)
3828 .context("file URL payload is not UTF-8")?
3829 .to_owned(),
3830 )),
3831 other => anyhow::bail!("unknown file data_kind {other}"),
3832 }
3833}
3834
3835fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3836 let value = json_parse::<Value>(variant_data)?;
3837 value
3838 .get("data_kind")
3839 .and_then(Value::as_str)
3840 .map(str::to_owned)
3841 .context("file part variant_data missing data_kind")
3842}
3843
3844fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3845 batch
3846 .column_by_name(name)
3847 .with_context(|| format!("missing column {name}"))?
3848 .as_any()
3849 .downcast_ref::<UInt64Array>()
3850 .with_context(|| format!("column {name} is not UInt64"))
3851}
3852
3853pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3854 let array = batch
3855 .column_by_name(name)
3856 .with_context(|| format!("missing column {name}"))?
3857 .as_any()
3858 .downcast_ref::<StringArray>()
3859 .with_context(|| format!("column {name} is not Utf8"))?;
3860 if array.is_null(row) {
3861 Ok(None)
3862 } else {
3863 Ok(Some(array.value(row).to_owned()))
3864 }
3865}
3866
3867fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3868 let column = batch
3872 .column_by_name(name)
3873 .with_context(|| format!("missing column {name}"))?;
3874 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3875 return if array.is_null(row) {
3876 Ok(None)
3877 } else {
3878 Ok(Some(
3879 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3880 ))
3881 };
3882 }
3883 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3884 return if array.is_null(row) {
3885 Ok(None)
3886 } else {
3887 Ok(Some(array.value(row).as_bytes().to_vec()))
3888 };
3889 }
3890 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3891 return if array.is_null(row) {
3892 Ok(None)
3893 } else {
3894 Ok(Some(array.value(row).as_bytes().to_vec()))
3895 };
3896 }
3897 anyhow::bail!("column {name} is not a JSON-compatible array")
3898}
3899
3900fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3901 let array = batch
3902 .column_by_name(name)
3903 .with_context(|| format!("missing column {name}"))?
3904 .as_any()
3905 .downcast_ref::<Int32Array>()
3906 .with_context(|| format!("column {name} is not Int32"))?;
3907 Ok(array.value(row))
3908}
3909
3910pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3911 let array = batch
3912 .column_by_name(name)
3913 .with_context(|| format!("missing column {name}"))?
3914 .as_any()
3915 .downcast_ref::<Float32Array>()
3916 .with_context(|| format!("column {name} is not Float32"))?;
3917 Ok(array.value(row))
3918}
3919
3920pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3921 let array = batch
3922 .column_by_name(name)
3923 .with_context(|| format!("missing column {name}"))?
3924 .as_any()
3925 .downcast_ref::<TimestampMicrosecondArray>()
3926 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3927 Utc.timestamp_micros(array.value(row))
3928 .single()
3929 .context("timestamp is out of range")
3930}
3931
3932fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3933 Field::new(name, data_type, nullable).with_metadata(
3934 [(
3935 "lance-schema:unenforced-primary-key".to_owned(),
3936 "true".to_owned(),
3937 )]
3938 .into(),
3939 )
3940}
3941
3942fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3952 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3953 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3954 .into_iter()
3955 .collect(),
3956 )
3957}
3958
3959fn json_field(name: &str, nullable: bool) -> Field {
3960 lance_arrow::json::json_field(name, nullable)
3961}
3962
3963fn micros(timestamp: DateTime<Utc>) -> i64 {
3964 timestamp.timestamp_micros()
3965}
3966
3967fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3968 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3976 lance_arrow::json::encode_json(&text)
3977 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3978}
3979
3980fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3981 serde_json::from_slice(value).context("failed to parse JSON field")
3982}
3983
3984fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3985 if let PartKind::File {
3986 media_type,
3987 file_name,
3988 data,
3989 } = kind
3990 {
3991 let data_kind = match data {
3992 FileData::String(_) => "string",
3993 FileData::Bytes(_) => "bytes",
3994 FileData::Url(_) => "url",
3995 };
3996 return json_bytes(&serde_json::json!({
3997 "media_type": media_type,
3998 "file_name": file_name,
3999 "data_kind": data_kind,
4000 }));
4001 }
4002 let value = serde_json::to_value(kind)?;
4003 let mut object = value
4004 .as_object()
4005 .cloned()
4006 .context("part variant did not serialize to an object")?;
4007 object.remove("type");
4008 json_bytes(&object)
4009}
4010
4011fn part_kind_from_json(
4012 type_name: &str,
4013 variant_data: &[u8],
4014 file_data: Option<FileData>,
4015) -> Result<PartKind> {
4016 let mut value = json_parse::<Value>(variant_data)?;
4017 let object = value
4018 .as_object_mut()
4019 .context("part variant data is not an object")?;
4020 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
4021 if let Some(data) = file_data {
4022 object.remove("data_kind");
4023 object.insert("data".to_owned(), serde_json::to_value(data)?);
4024 }
4025 serde_json::from_value(value).context("failed to parse part kind")
4026}
4027
4028#[cfg(test)]
4029mod tests {
4030 #![allow(clippy::expect_used, clippy::unwrap_used)]
4031
4032 use super::*;
4033 use crate::{
4034 adapter::Extracted,
4035 handlers::ingest_events,
4036 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
4037 };
4038 use chrono::Utc;
4039 use serde_json::json;
4040 use tempfile::TempDir;
4041
4042 fn synthetic_session(id: &str) -> Session {
4043 Session {
4044 id: id.to_owned(),
4045 parent_session_id: None,
4046 parent_message_id: None,
4047 source_agent: "claude-code".to_owned(),
4048 created_at: Utc::now(),
4049 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4050 options: ProviderOptions::new(),
4051 }
4052 }
4053
4054 #[test]
4055 fn search_text_excludes_injected_parts() {
4056 use crate::wire::Provenance;
4057 let message = Message::User {
4058 id: "m1".to_owned(),
4059 session_id: "s1".to_owned(),
4060 timestamp: Utc::now(),
4061 options: ProviderOptions::new(),
4062 };
4063 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4064 session_id: "s1".to_owned(),
4065 id: id.to_owned(),
4066 message_id: "m1".to_owned(),
4067 ordinal: 0,
4068 provenance,
4069 options: ProviderOptions::new(),
4070 kind: PartKind::Text {
4071 text: Some(Extracted::from_test_value(text.to_owned())),
4072 },
4073 };
4074
4075 let conversational = search_text(
4078 &message,
4079 &[text_part(
4080 "p1",
4081 "real human prompt",
4082 Provenance::Conversational,
4083 )],
4084 );
4085 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4086
4087 let injected = search_text(
4088 &message,
4089 &[text_part(
4090 "p2",
4091 "<task-notification>...</task-notification>",
4092 Provenance::Injected,
4093 )],
4094 );
4095 assert!(
4096 injected.is_none(),
4097 "a message whose only part is injected has null search_text"
4098 );
4099 }
4100
4101 #[test]
4102 fn chunk_ranges_splits_on_byte_budget() {
4103 assert!(chunk_ranges(&[]).is_empty());
4104 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4105
4106 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4107 assert_eq!(
4108 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4109 vec![0..1, 1..2, 2..3],
4110 );
4111
4112 assert_eq!(
4114 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4115 vec![0..1, 1..2, 2..3],
4116 );
4117 }
4118
4119 #[tokio::test]
4120 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4121 let temp = TempDir::new()?;
4126 let store = Store::open_local(temp.path()).await?;
4127 let session = synthetic_session("ordering");
4128 let orphan_part = Part {
4129 session_id: session.id.clone(),
4130 id: "orphan-part".to_owned(),
4131 message_id: "missing-message".to_owned(),
4132 ordinal: 0,
4133 provenance: crate::wire::Provenance::Conversational,
4134 options: ProviderOptions::new(),
4135 kind: PartKind::Text {
4136 text: Some(Extracted::from_test_value("orphan".to_owned())),
4137 },
4138 };
4139 let valid_message = Message::User {
4140 id: "valid-message".to_owned(),
4141 session_id: session.id.clone(),
4142 timestamp: Utc::now(),
4143 options: ProviderOptions::new(),
4144 };
4145 let valid_part = Part {
4146 session_id: session.id.clone(),
4147 id: "valid-part".to_owned(),
4148 message_id: valid_message.id().to_owned(),
4149 ordinal: 0,
4150 provenance: crate::wire::Provenance::Conversational,
4151 options: ProviderOptions::new(),
4152 kind: PartKind::Text {
4153 text: Some(Extracted::from_test_value("kept".to_owned())),
4154 },
4155 };
4156
4157 let mut validator = IngestValidator::default();
4158 validator
4159 .push(&store, 0, IngestEvent::Session(session.clone()))
4160 .await?;
4161 let part_outcomes = validator
4162 .push(&store, 1, IngestEvent::Part(orphan_part))
4163 .await?;
4164 assert_eq!(part_outcomes.len(), 1);
4165 assert_eq!(part_outcomes[0].kind, "part");
4166 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4167 assert!(
4168 part_outcomes[0]
4169 .error
4170 .as_ref()
4171 .map(|e| e.message.contains("part event appeared before a message"))
4172 .unwrap_or(false),
4173 "error message must explain the ordering violation: {part_outcomes:?}"
4174 );
4175 validator
4176 .push(&store, 2, IngestEvent::Message(valid_message))
4177 .await?;
4178 validator
4179 .push(&store, 3, IngestEvent::Part(valid_part))
4180 .await?;
4181 validator.finish(&store).await?;
4182
4183 let (sessions, messages, parts) = store.row_counts().await?;
4184 assert_eq!(sessions, 1, "session committed despite the orphan part");
4185 assert_eq!(messages, 1, "valid message committed");
4186 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4187
4188 Ok(())
4189 }
4190
4191 #[tokio::test]
4192 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4193 let temp = TempDir::new()?;
4197 let store = Store::open_local(temp.path()).await?;
4198 let session = synthetic_session("duplicate-message");
4199 let first = Message::User {
4200 id: "message-1".to_owned(),
4201 session_id: session.id.clone(),
4202 timestamp: Utc::now(),
4203 options: ProviderOptions::new(),
4204 };
4205 let second = Message::Assistant {
4206 id: "message-1".to_owned(),
4207 session_id: session.id.clone(),
4208 timestamp: Utc::now(),
4209 options: ProviderOptions::new(),
4210 };
4211
4212 let mut validator = IngestValidator::default();
4213 validator
4214 .push(&store, 0, IngestEvent::Session(session.clone()))
4215 .await?;
4216 validator
4217 .push(&store, 1, IngestEvent::Message(first))
4218 .await?;
4219 let dup_outcomes = validator
4220 .push(&store, 2, IngestEvent::Message(second))
4221 .await?;
4222 assert_eq!(dup_outcomes.len(), 1);
4223 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4224 assert!(
4225 dup_outcomes[0]
4226 .error
4227 .as_ref()
4228 .map(|e| e.message.contains("duplicate message id message-1"))
4229 .unwrap_or(false),
4230 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4231 );
4232
4233 validator.finish(&store).await?;
4234 let (sessions, messages, _) = store.row_counts().await?;
4235 assert_eq!(sessions, 1, "session committed");
4236 assert_eq!(messages, 1, "only the first message committed");
4237
4238 Ok(())
4239 }
4240
4241 #[tokio::test]
4242 async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4243 -> anyhow::Result<()> {
4244 let temp = TempDir::new()?;
4248 let store = Store::open_local(temp.path()).await?;
4249 let session = synthetic_session("host-provenance");
4250 let mut spoofed = ProviderOptions::new();
4251 spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4252 let message = Message::User {
4253 id: "message-1".to_owned(),
4254 session_id: session.id.clone(),
4255 timestamp: Utc::now(),
4256 options: spoofed,
4257 };
4258 let part = Part {
4259 session_id: session.id.clone(),
4260 id: "part-1".to_owned(),
4261 message_id: "message-1".to_owned(),
4262 ordinal: 0,
4263 provenance: crate::wire::Provenance::Conversational,
4264 options: ProviderOptions::new(),
4265 kind: PartKind::Text {
4266 text: Some(Extracted::from_test_value("hello".to_owned())),
4267 },
4268 };
4269
4270 let mut validator = IngestValidator::default();
4271 validator
4272 .push(&store, 0, IngestEvent::Session(session.clone()))
4273 .await?;
4274 validator
4275 .push(&store, 1, IngestEvent::Message(message))
4276 .await?;
4277 validator.push(&store, 2, IngestEvent::Part(part)).await?;
4278 validator.finish(&store).await?;
4279
4280 let stored = store
4281 .get_session(&session.id)
4282 .await?
4283 .expect("ingested session is readable");
4284 assert!(
4285 !stored.session.options.contains_key("pond"),
4286 "session rows are not stamped (attribution derives from messages)"
4287 );
4288 let stored_message = &stored.messages[0].message;
4289 match ingest_host_stamp() {
4290 Some(stamp) => {
4291 assert_eq!(
4292 stored_message.options().get("pond"),
4293 Some(stamp),
4294 "stored message carries the real stamp, never the spoof"
4295 );
4296 let host = stamp
4297 .pointer("/ingest/host")
4298 .and_then(Value::as_object)
4299 .expect("stamp shape is {ingest: {host: {..}}}");
4300 assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4301 assert!(
4302 host.values()
4303 .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4304 "stamp fields are omitted when unavailable, never empty: {host:?}"
4305 );
4306 }
4307 None => assert!(
4308 stored_message.options().get("pond").is_none(),
4309 "with no resolvable stamp the spoofed key is still stripped"
4310 ),
4311 }
4312 assert!(
4313 !stored.messages[0].parts[0].options.contains_key("pond"),
4314 "part rows are not stamped (covered by their message's stamp)"
4315 );
4316
4317 Ok(())
4318 }
4319
4320 #[tokio::test(flavor = "multi_thread")]
4328 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4329 use crate::wire::{FileData, PartKind, Provenance};
4330 let temp = TempDir::new()?;
4331 let store = Store::open_local(temp.path()).await?;
4332
4333 let session = synthetic_session("compact-blob");
4334 store
4335 .upsert_sessions(std::slice::from_ref(&session))
4336 .await?;
4337
4338 let make_part = |idx: usize, kind: PartKind| Part {
4339 session_id: session.id.clone(),
4340 message_id: format!("msg-{idx}"),
4341 id: format!("part-{idx}"),
4342 ordinal: 0,
4343 provenance: Provenance::Conversational,
4344 options: ProviderOptions::new(),
4345 kind,
4346 };
4347
4348 let batch_a = vec![
4349 make_part(
4350 0,
4351 PartKind::File {
4352 media_type: Some("text/plain".to_owned()),
4353 file_name: Some("a.txt".to_owned()),
4354 data: FileData::Bytes(b"alpha".to_vec()),
4355 },
4356 ),
4357 make_part(
4358 1,
4359 PartKind::File {
4360 media_type: Some("text/plain".to_owned()),
4361 file_name: Some("b.txt".to_owned()),
4362 data: FileData::String("beta".to_owned()),
4363 },
4364 ),
4365 ];
4366 store.upsert_parts(&batch_a).await?;
4367
4368 let batch_b = vec![
4369 make_part(
4370 2,
4371 PartKind::File {
4372 media_type: Some("application/octet-stream".to_owned()),
4373 file_name: None,
4374 data: FileData::Url("https://example.com/file".to_owned()),
4375 },
4376 ),
4377 make_part(
4378 3,
4379 PartKind::File {
4380 media_type: Some("image/png".to_owned()),
4381 file_name: Some("c.png".to_owned()),
4382 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4383 },
4384 ),
4385 ];
4386 store.upsert_parts(&batch_b).await?;
4387
4388 store
4389 .optimize_indices(None, &MaintenancePolicy::always_compact())
4390 .await?
4391 .into_result()?;
4392
4393 Ok(())
4394 }
4395
4396 #[tokio::test]
4397 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4398 let temp = TempDir::new()?;
4399 let store = Store::open_local(temp.path()).await?;
4400 let session = synthetic_session("blob");
4401 let message = Message::User {
4402 id: "message-1".to_owned(),
4403 session_id: session.id.clone(),
4404 timestamp: Utc::now(),
4405 options: ProviderOptions::new(),
4406 };
4407 let part = Part {
4408 session_id: session.id.clone(),
4409 id: "part-1".to_owned(),
4410 message_id: message.id().to_owned(),
4411 ordinal: 0,
4412 provenance: crate::wire::Provenance::Conversational,
4413 options: ProviderOptions::new(),
4414 kind: PartKind::File {
4415 media_type: Some("text/plain".to_owned()),
4416 file_name: Some("payload.txt".to_owned()),
4417 data: FileData::Bytes(b"pond".to_vec()),
4418 },
4419 };
4420
4421 let mut validator = IngestValidator::default();
4422 validator
4423 .push(&store, 0, IngestEvent::Session(session.clone()))
4424 .await?;
4425 validator
4426 .push(&store, 1, IngestEvent::Message(message.clone()))
4427 .await?;
4428 validator
4429 .push(&store, 2, IngestEvent::Part(part.clone()))
4430 .await?;
4431 validator.finish(&store).await?;
4432
4433 let stored = store
4434 .get_session(&session.id)
4435 .await?
4436 .expect("session should exist");
4437 let stored_part = &stored.messages[0].parts[0];
4438 assert_eq!(stored_part, &part);
4439
4440 Ok(())
4441 }
4442
4443 fn base_session() -> Session {
4454 Session {
4455 id: "01HXY00000000001".to_owned(),
4456 parent_session_id: None,
4457 parent_message_id: None,
4458 source_agent: "claude-code".to_owned(),
4459 created_at: Utc::now(),
4460 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4461 options: ProviderOptions::new(),
4462 }
4463 }
4464
4465 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4466 outcomes
4467 .iter()
4468 .filter(|outcome| outcome.status == target)
4469 .count()
4470 }
4471
4472 #[tokio::test(flavor = "multi_thread")]
4473 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4474 -> anyhow::Result<()> {
4475 let temp = TempDir::new()?;
4476 let store = Store::open_local(temp.path()).await?;
4477
4478 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4479 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4480
4481 let mut again = base_session();
4482 again.options.insert("title".to_owned(), json!("renamed"));
4483 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4484 assert_eq!(
4485 count_status(&second, OutcomeStatus::Error),
4486 0,
4487 "options is mutable; the re-ingest must not surface an error: {second:?}",
4488 );
4489 assert_eq!(
4490 count_status(&second, OutcomeStatus::Matched),
4491 1,
4492 "unchanged immutable fields must match-insert via merge_insert",
4493 );
4494
4495 Ok(())
4496 }
4497
4498 #[tokio::test(flavor = "multi_thread")]
4499 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4500 let temp = TempDir::new()?;
4501 let store = Store::open_local(temp.path()).await?;
4502
4503 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4504 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4505
4506 let mut tampered = base_session();
4507 tampered.source_agent = "codex-cli".to_owned();
4508 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4509 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4510 let err_row = second
4511 .iter()
4512 .find(|outcome| outcome.status == OutcomeStatus::Error)
4513 .expect("error outcome present");
4514 let err = err_row.error.as_ref().expect("error body present");
4515 assert_eq!(err.field, Some("source_agent"));
4516 assert_eq!(err.reason, Some("immutable"));
4517
4518 let stored = store
4520 .get_session(&base_session().id)
4521 .await?
4522 .expect("session row survives the rejected re-ingest");
4523 assert_eq!(stored.session.source_agent, "claude-code");
4524
4525 Ok(())
4526 }
4527
4528 #[tokio::test(flavor = "multi_thread")]
4529 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4530 let temp = TempDir::new()?;
4531 let store = Store::open_local(temp.path()).await?;
4532
4533 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4534 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4535
4536 let mut tampered = base_session();
4537 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4538 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4539 let err_row = second
4540 .iter()
4541 .find(|outcome| outcome.status == OutcomeStatus::Error)
4542 .expect("project change must surface an error outcome");
4543 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4544
4545 let stored = store
4546 .get_session(&base_session().id)
4547 .await?
4548 .expect("session row survives");
4549 assert_eq!(
4550 stored.session.project.as_str(),
4551 "/home/me/proj",
4552 "stored project must remain the original",
4553 );
4554
4555 Ok(())
4556 }
4557
4558 #[tokio::test(flavor = "multi_thread")]
4559 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4560 use crate::wire::Provenance;
4568 let temp = TempDir::new()?;
4569 let store = Store::open_local(temp.path()).await?;
4570 let session = base_session();
4571
4572 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4573 session_id: session.id.clone(),
4574 id: part_id.to_owned(),
4575 message_id: message_id.to_owned(),
4576 ordinal: 0,
4577 provenance: Provenance::Conversational,
4578 options: ProviderOptions::new(),
4579 kind: PartKind::Text {
4580 text: Some(Extracted::from_test_value(body.to_owned())),
4581 },
4582 };
4583 let user_message = |id: &str| Message::User {
4584 id: id.to_owned(),
4585 session_id: session.id.clone(),
4586 timestamp: Utc::now(),
4587 options: ProviderOptions::new(),
4588 };
4589
4590 let mut validator = IngestValidator::default();
4592 validator
4593 .push(&store, 0, IngestEvent::Session(session.clone()))
4594 .await?;
4595 validator
4596 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4597 .await?;
4598 validator
4599 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4600 .await?;
4601 validator
4602 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4603 .await?;
4604 validator
4605 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4606 .await?;
4607 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4608 assert_eq!(first_counts.sessions_inserted, 1);
4609 assert_eq!(first_counts.messages_inserted_total, 2);
4610 assert_eq!(first_counts.messages_inserted_searchable, 2);
4611
4612 let mut validator = IngestValidator::default();
4614 validator
4615 .push(&store, 0, IngestEvent::Session(session.clone()))
4616 .await?;
4617 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4618 let pid = format!("p{}", idx + 3);
4619 validator
4620 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4621 .await?;
4622 validator
4623 .push(
4624 &store,
4625 idx * 2 + 2,
4626 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4627 )
4628 .await?;
4629 }
4630 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4631
4632 assert_eq!(
4633 second_counts.sessions_inserted, 0,
4634 "existing session row must report as Matched, not Inserted",
4635 );
4636 assert_eq!(second_counts.sessions_matched, 1);
4637 assert_eq!(
4638 second_counts.messages_inserted_total, 3,
4639 "the three NEW messages must register as Inserted in BatchCounts",
4640 );
4641 assert_eq!(
4642 second_counts.messages_inserted_searchable, 3,
4643 "all three new messages carry conversational text -> searchable",
4644 );
4645 assert_eq!(second_counts.messages_matched_total, 0);
4646 assert_eq!(second_counts.parts_inserted, 3);
4647 assert_eq!(second_counts.parts_matched, 0);
4648
4649 let session_outcome = second_outcomes
4652 .iter()
4653 .find(|outcome| outcome.kind == "session")
4654 .expect("session-row outcome present");
4655 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4656 for outcome in &second_outcomes {
4657 if outcome.kind == "message" || outcome.kind == "part" {
4658 assert_eq!(
4659 outcome.status,
4660 OutcomeStatus::Inserted,
4661 "new row must be Inserted, got: {outcome:?}",
4662 );
4663 }
4664 }
4665 Ok(())
4666 }
4667
4668 async fn store_with_messages(
4672 temp: &TempDir,
4673 count: usize,
4674 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4675 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4676 }
4677
4678 async fn store_with_messages_at_threshold(
4681 temp: &TempDir,
4682 count: usize,
4683 _vector_threshold: usize,
4684 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4685 let store = Store::open_local(temp.path()).await?;
4686 let sessions = 8.min(count.max(1));
4687 let mut events = Vec::new();
4688 for s in 0..sessions {
4689 events.push(IngestEvent::Session(Session {
4690 id: format!("session-{s}"),
4691 parent_session_id: None,
4692 parent_message_id: None,
4693 source_agent: "claude-code".to_owned(),
4694 created_at: Utc::now(),
4695 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4696 options: ProviderOptions::new(),
4697 }));
4698 for i in (s..count).step_by(sessions) {
4699 let message_id = format!("msg-{i}");
4700 events.push(IngestEvent::Message(Message::User {
4701 id: message_id.clone(),
4702 session_id: format!("session-{s}"),
4703 timestamp: Utc::now(),
4704 options: ProviderOptions::new(),
4705 }));
4706 events.push(IngestEvent::Part(Part {
4707 session_id: format!("session-{s}"),
4708 id: format!("{message_id}-part"),
4709 message_id,
4710 ordinal: 0,
4711 provenance: crate::wire::Provenance::Conversational,
4712 options: ProviderOptions::new(),
4713 kind: PartKind::Text {
4714 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4715 },
4716 }));
4717 }
4718 }
4719 ingest_events(&store, events).await?;
4720 let keys = (0..count)
4721 .map(|i| MessageKey {
4722 session_id: format!("session-{}", i % sessions),
4723 message_id: format!("msg-{i}"),
4724 })
4725 .collect();
4726 Ok((store, keys))
4727 }
4728
4729 fn synthetic_vector(seed: usize) -> Vec<f32> {
4731 let mut state = (seed as u64)
4732 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4733 .wrapping_add(1);
4734 (0..embedding_dim())
4735 .map(|_| {
4736 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4737 #[allow(clippy::cast_precision_loss)]
4738 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4739 unit - 1.0
4740 })
4741 .collect()
4742 }
4743
4744 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4746 keys.iter()
4747 .enumerate()
4748 .map(|(seed, key)| EmbeddedMessage {
4749 session_id: key.session_id.clone(),
4750 id: key.message_id.clone(),
4751 vector: synthetic_vector(seed),
4752 })
4753 .collect()
4754 }
4755
4756 fn embedding_update_batch_with_model(
4757 rows: &[EmbeddedMessage],
4758 model: &str,
4759 ) -> Result<RecordBatch> {
4760 let mut batch = embedding_update_batch(rows)?;
4761 let columns = batch
4762 .columns()
4763 .iter()
4764 .take(3)
4765 .cloned()
4766 .chain(std::iter::once(
4767 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4768 ))
4769 .collect::<Vec<_>>();
4770 batch = RecordBatch::try_new(batch.schema(), columns)?;
4771 Ok(batch)
4772 }
4773
4774 #[tokio::test]
4775 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4776 let temp = TempDir::new()?;
4777 let (store, keys) = store_with_messages(&temp, 4).await?;
4781 store.write_embeddings(&embedded(&keys)).await?;
4782 store
4783 .optimize_indices(None, &MaintenancePolicy::always_compact())
4784 .await?
4785 .into_result()?;
4786
4787 let query = vec![0.01_f32; embedding_dim()];
4788 let plan = store
4789 .explain_vector_plan(
4790 &query,
4791 10,
4792 &Predicate::Eq("session_id", "session-3".into()),
4793 None,
4794 )
4795 .await?;
4796
4797 assert!(
4802 plan.contains("ScalarIndexQuery"),
4803 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4804 );
4805 let predicate_postfiltered = plan
4806 .lines()
4807 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4808 assert!(
4809 !predicate_postfiltered,
4810 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4811 );
4812 Ok(())
4813 }
4814
4815 #[tokio::test]
4816 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4817 let temp = TempDir::new()?;
4818 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4819
4820 store.write_embeddings(&embedded(&keys[..255])).await?;
4823 store
4824 .optimize_indices_with_vector_threshold(256)
4825 .await?
4826 .into_result()?;
4827 assert!(
4828 !store
4829 .handle
4830 .messages_index_names()
4831 .await?
4832 .iter()
4833 .any(|name| name == MESSAGES_VECTOR_INDEX),
4834 "IVF_PQ must not exist below the activation threshold",
4835 );
4836
4837 store.write_embeddings(&embedded(&keys[255..256])).await?;
4840 store
4841 .optimize_indices_with_vector_threshold(256)
4842 .await?
4843 .into_result()?;
4844 assert!(
4845 store
4846 .handle
4847 .messages_index_names()
4848 .await?
4849 .iter()
4850 .any(|name| name == MESSAGES_VECTOR_INDEX),
4851 "optimize must create the IVF_PQ once the threshold is crossed",
4852 );
4853
4854 let hits = store
4857 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4858 .await?;
4859 assert!(
4860 hits.iter().any(|(key, _)| key == &keys[0]),
4861 "an embedded row is retrievable via the index",
4862 );
4863 Ok(())
4864 }
4865
4866 #[tokio::test]
4867 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4868 {
4869 let temp = TempDir::new()?;
4870 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4871 let old_rows = embedded(&keys);
4872 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4873 store
4874 .handle
4875 .merge_update(Table::Messages, old_batch, old_rows.len())
4876 .await?;
4877 store
4878 .optimize_indices_with_vector_threshold(256)
4879 .await?
4880 .into_result()?;
4881 assert!(
4882 store
4883 .handle
4884 .messages_index_names()
4885 .await?
4886 .iter()
4887 .any(|name| name == MESSAGES_VECTOR_INDEX),
4888 "IVF_PQ must exist before a model swap",
4889 );
4890 assert_eq!(store.stale_embedding_count().await?, keys.len());
4891
4892 store.drop_vector_index().await?;
4893 let mut pending = Vec::new();
4894 let stream = store.pending_or_stale_messages();
4895 tokio::pin!(stream);
4896 while let Some(row) = stream.next().await {
4897 pending.push(row?);
4898 }
4899 assert_eq!(
4900 pending.len(),
4901 keys.len(),
4902 "force stream should see stale rows"
4903 );
4904 store.write_embeddings(&embedded(&keys)).await?;
4905 assert_eq!(store.stale_embedding_count().await?, 0);
4906 store
4907 .optimize_indices_with_vector_threshold(256)
4908 .await?
4909 .into_result()?;
4910 assert!(
4911 store
4912 .handle
4913 .messages_index_names()
4914 .await?
4915 .iter()
4916 .any(|name| name == MESSAGES_VECTOR_INDEX),
4917 "optimize must rebuild IVF_PQ after force re-embed",
4918 );
4919
4920 let stream = store.pending_or_stale_messages();
4921 tokio::pin!(stream);
4922 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4923 Ok(())
4924 }
4925
4926 #[tokio::test]
4927 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4928 let temp = TempDir::new()?;
4937 let (store, _keys) = store_with_messages(&temp, 4).await?;
4938
4939 for tag in 0..3 {
4942 let extra = synthetic_session(&format!("extra-{tag}"));
4943 store.upsert_sessions(&[extra]).await?;
4944 }
4945
4946 let dataset = store.handle.dataset(Table::Sessions).await?;
4951 dataset
4952 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4953 .await
4954 .context("cleanup_old_versions failed")?;
4955
4956 let map = store.session_last_ingested_at().await?;
4957 let session_count = store.row_counts().await?.0;
4958 assert!(
4959 map.len() >= session_count,
4960 "watermark map ({}) must still cover every session ({}) after \
4961 version cleanup; an empty fallback regresses pond sync to a \
4962 full re-scan",
4963 map.len(),
4964 session_count,
4965 );
4966 Ok(())
4967 }
4968
4969 #[tokio::test]
4970 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4971 let temp = TempDir::new()?;
4972 let (store, keys) = store_with_messages(&temp, 10).await?;
4973
4974 let before = store.embedding_progress().await?;
4975 assert_eq!(before.embedded, 0);
4976 assert_eq!(before.total, 10);
4977 assert_eq!(before.model, crate::embed::model_id());
4978
4979 store.write_embeddings(&embedded(&keys[..4])).await?;
4980 let partial = store.embedding_progress().await?;
4981 assert_eq!(partial.embedded, 4);
4982 assert_eq!(partial.total, 10);
4983
4984 store.write_embeddings(&embedded(&keys[4..])).await?;
4985 let full = store.embedding_progress().await?;
4986 assert_eq!(full.embedded, 10);
4987 assert_eq!(full.total, 10);
4988 Ok(())
4989 }
4990}