1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{
32 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33 SessionFrom,
34 },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40 handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45 pub sessions: usize,
46 pub messages: usize,
47 pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52 pub sessions: u64,
53 pub messages: u64,
54 pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59 pub rows: LanceArchiveCounts,
60 pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65 pub rows: LanceArchiveCounts,
66 pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71 pub sessions: Vec<IndexIntent>,
72 pub messages: Vec<IndexIntent>,
73 pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78 [
79 (Table::Sessions, &self.sessions),
80 (Table::Messages, &self.messages),
81 (Table::Parts, &self.parts),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91 pub session_id: String,
92 pub id: String,
93 pub search_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100 pub session_id: String,
101 pub id: String,
102 pub vector: Vec<f32>,
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108 pub message_id: String,
109 pub session_id: String,
110 pub role: String,
111 pub project: String,
112 pub source_agent: String,
113 pub timestamp: DateTime<Utc>,
114 pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119 pub session_id: String,
120 pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125 Inserted,
126 Matched,
127}
128
129#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135 pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139 pub fn any_indices_failed(&self) -> bool {
142 self.tables.iter().any(|t| t.indices.is_failed())
143 }
144
145 pub fn into_result(self) -> Result<Self> {
149 for table in &self.tables {
150 if let PhaseOutcome::Failed(error) = &table.indices {
151 anyhow::bail!(
152 "indices phase failed on {}: {error:#}",
153 table.table.as_str()
154 );
155 }
156 if let PhaseOutcome::Failed(error) = &table.compaction {
157 anyhow::bail!(
158 "compaction phase failed on {}: {error:#}",
159 table.table.as_str()
160 );
161 }
162 }
163 Ok(self)
164 }
165}
166
167#[derive(Debug, Clone)]
170pub struct CorpusStats {
171 pub data_url: Url,
172 pub totals: RowTotals,
173 pub adapters: Vec<AdapterStats>,
181 pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189 pub sessions: u64,
190 pub messages: u64,
191 pub parts: u64,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201 pub embedded: usize,
202 pub total: usize,
203 pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208 pub adapter: String,
212 pub sessions: u64,
213 pub messages: u64,
214 pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221 pub project: String,
222 pub sessions: u64,
223 pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228 messages: u64,
229 session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234 pub message: &'a Message,
235 pub parts: &'a [Part],
236 pub search_text: Option<&'a str>,
237}
238
239impl Store {
240 pub async fn open(location: &Url) -> Result<Self> {
246 Ok(Self {
247 handle: Handle::open(location).await?,
248 })
249 }
250
251 pub async fn open_with_options(
257 location: &Url,
258 storage_options: std::collections::HashMap<String, String>,
259 caps: crate::substrate::RuntimeCaps,
260 ) -> Result<Self> {
261 Ok(Self {
262 handle: Handle::open_with_options(location, storage_options, caps).await?,
263 })
264 }
265
266 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271 let url = config::url_for_path(path)?;
272 Self::open_with_options(
273 &url,
274 std::collections::HashMap::new(),
275 crate::substrate::RuntimeCaps::default(),
276 )
277 .await
278 }
279
280 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288 std::fs::create_dir_all(dest)
289 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290 let (sessions, sessions_version) = self
291 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292 .await?;
293 let (messages, messages_version) = self
294 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295 .await?;
296 let (parts, parts_version) = self
297 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298 .await?;
299 Ok(LanceArchiveExport {
300 rows: LanceArchiveCounts {
301 sessions,
302 messages,
303 parts,
304 },
305 source_versions: LanceArchiveVersions {
306 sessions: sessions_version,
307 messages: messages_version,
308 parts: parts_version,
309 },
310 })
311 }
312
313 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314 let sessions_dataset =
315 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316 let messages_dataset =
317 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319 let (sessions, sessions_inserted) = self
320 .import_clean_table(Table::Sessions, sessions_dataset)
321 .await?;
322 let (messages, messages_inserted) = self
323 .import_clean_table(Table::Messages, messages_dataset)
324 .await?;
325 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326 Ok(LanceArchiveImport {
327 rows: LanceArchiveCounts {
328 sessions,
329 messages,
330 parts,
331 },
332 inserted: LanceArchiveCounts {
333 sessions: sessions_inserted,
334 messages: messages_inserted,
335 parts: parts_inserted,
336 },
337 })
338 }
339
340 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341 let dataset = self.handle.dataset(table).await?;
342 let source_version = dataset.version_id();
343 let schema = export_schema(table);
344 let mut stream = dataset
345 .scan()
346 .try_into_stream()
347 .await
348 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349 let dest_uri = dest
350 .to_str()
351 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353 let mut rows = 0usize;
354 let mut wrote = false;
355 while let Some(batch) = stream.next().await {
356 let batch = batch
357 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358 rows += batch.num_rows();
359 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360 let mut params = write_params_for_create();
361 if wrote {
362 params.mode = WriteMode::Append;
363 }
364 Dataset::write(reader, dest_uri, Some(params))
365 .await
366 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367 wrote = true;
368 }
369
370 if !wrote {
371 let batch = RecordBatch::new_empty(schema.clone());
372 let reader = RecordBatchIterator::new([Ok(batch)], schema);
373 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374 .await
375 .with_context(|| {
376 format!("failed to write empty {} archive table", table.as_str())
377 })?;
378 }
379 Ok((rows, source_version))
380 }
381
382 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383 let mut stream = dataset
384 .scan()
385 .try_into_stream()
386 .await
387 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388 let mut rows = 0usize;
389 let mut inserted = 0usize;
390 while let Some(batch) = stream.next().await {
391 let batch = batch
392 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393 let row_count = batch.num_rows();
394 rows += row_count;
395 inserted += self
396 .handle
397 .merge_insert(table, batch, row_count)
398 .await
399 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400 as usize;
401 }
402 Ok((rows, inserted))
403 }
404
405 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406 if sessions.is_empty() {
407 return Ok(Vec::new());
408 }
409 let batches = sessions_batches(sessions)?;
410 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411 Ok(statuses_from_inserted(sessions.len(), inserted))
413 }
414
415 async fn upsert_session_batch(
439 &self,
440 substreams: Vec<CompletedSubstream>,
441 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442 if substreams.is_empty() {
443 return Ok((Vec::new(), BatchCounts::default()));
444 }
445
446 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447 let mut counts = BatchCounts::default();
448
449 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453 let mut by_session_id: std::collections::HashMap<String, usize> =
454 std::collections::HashMap::with_capacity(substreams.len());
455 for substream in substreams {
456 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457 let existing = &merged[existing_idx];
458 if existing.session.source_agent != substream.session.source_agent
459 || existing.session.project != substream.session.project
460 {
461 let reason = if existing.session.source_agent != substream.session.source_agent
466 {
467 IngestError::ImmutableField {
468 field: "source_agent",
469 session_id: substream.session.id.clone(),
470 stored: existing.session.source_agent.clone(),
471 attempted: substream.session.source_agent.clone(),
472 }
473 } else {
474 IngestError::ImmutableField {
475 field: "project",
476 session_id: substream.session.id.clone(),
477 stored: (*existing.session.project).clone(),
478 attempted: (*substream.session.project).clone(),
479 }
480 };
481 let field = match &reason {
482 IngestError::ImmutableField { field, .. } => Some(*field),
483 };
484 let reason_key = match field {
485 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487 _ => DROP_REASON_UNCATEGORIZED,
488 };
489 outcomes.extend(error_outcomes_for_substream(
490 substream.session_index,
491 &substream.session,
492 &substream.messages,
493 reason.to_string(),
494 field,
495 reason_key,
496 ));
497 continue;
498 }
499 let existing = &mut merged[existing_idx];
504 let mut seen: std::collections::HashSet<String> = existing
505 .messages
506 .iter()
507 .map(|m| m.message.id().to_owned())
508 .collect();
509 for msg in substream.messages {
510 if seen.insert(msg.message.id().to_owned()) {
511 existing.messages.push(msg);
512 }
513 }
514 continue;
515 }
516 by_session_id.insert(substream.session.id.clone(), merged.len());
517 merged.push(substream);
518 }
519
520 let session_id_values: Vec<ScalarValue> = merged
525 .iter()
526 .map(|substream| ScalarValue::String(substream.session.id.clone()))
527 .collect();
528 let existing_sessions: std::collections::HashMap<String, Session> =
529 if session_id_values.is_empty() {
530 std::collections::HashMap::new()
531 } else {
532 let batch = self
533 .handle
534 .scan_batch(
535 Table::Sessions,
536 Some(&Predicate::In("id", session_id_values.clone())),
537 &[],
538 )
539 .await?;
540 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541 for row in 0..batch.num_rows() {
542 let session = session_from_batch(&batch, row)?;
543 map.insert(session.id.clone(), session);
544 }
545 map
546 };
547 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548 HashSet::new()
549 } else {
550 let batch = self
551 .handle
552 .scan_batch(
553 Table::Messages,
554 Some(&Predicate::In("session_id", session_id_values.clone())),
555 &["session_id", "id"],
556 )
557 .await?;
558 let mut set = HashSet::with_capacity(batch.num_rows());
559 for row in 0..batch.num_rows() {
560 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561 let mid = string(&batch, "id", row)?.context("message id is null")?;
562 set.insert((sid, mid));
563 }
564 set
565 };
566 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567 HashSet::new()
568 } else {
569 let batch = self
570 .handle
571 .scan_batch(
572 Table::Parts,
573 Some(&Predicate::In("session_id", session_id_values)),
574 &["session_id", "message_id", "id"],
575 )
576 .await?;
577 let mut set = HashSet::with_capacity(batch.num_rows());
578 for row in 0..batch.num_rows() {
579 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581 let pid = string(&batch, "id", row)?.context("part id is null")?;
582 set.insert((sid, mid, pid));
583 }
584 set
585 };
586
587 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588 for substream in merged {
589 if let Some(existing) = existing_sessions.get(&substream.session.id)
590 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591 {
592 let field = match &failure {
593 IngestError::ImmutableField { field, .. } => Some(*field),
594 };
595 let reason_key = match field {
596 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598 _ => DROP_REASON_UNCATEGORIZED,
599 };
600 outcomes.extend(error_outcomes_for_substream(
601 substream.session_index,
602 &substream.session,
603 &substream.messages,
604 failure.to_string(),
605 field,
606 reason_key,
607 ));
608 continue;
609 }
610 writeable.push(substream);
611 }
612
613 if writeable.is_empty() {
614 outcomes.sort_by_key(|outcome| outcome.index);
615 return Ok((outcomes, counts));
616 }
617
618 let sessions_owned: Vec<Session> = writeable
619 .iter()
620 .map(|substream| substream.session.clone())
621 .collect();
622 let message_rows: Vec<MessageBatchRow<'_>> = writeable
623 .iter()
624 .flat_map(|substream| {
625 substream.messages.iter().map(|buffered| MessageBatchRow {
626 message: &buffered.message,
627 source_agent: &substream.session.source_agent,
628 project: &substream.session.project,
629 search_text: buffered.search_text.as_deref(),
630 })
631 })
632 .collect();
633 let part_rows: Vec<Part> = writeable
634 .iter()
635 .flat_map(|substream| {
636 substream.messages.iter().flat_map(|buffered| {
637 buffered
638 .parts
639 .iter()
640 .map(|buffered_part| buffered_part.part.clone())
641 })
642 })
643 .collect();
644
645 let session_batches = sessions_batches(&sessions_owned)?;
646 let message_batches = messages_batches(&message_rows)?;
647 let part_batches = parts_batches(&part_rows)?;
648
649 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660 )?;
661
662 for substream in &writeable {
663 outcomes.extend(success_outcomes_for_substream(
664 substream.session_index,
665 &substream.session,
666 &substream.messages,
667 &existing_sessions,
668 &existing_message_pks,
669 &existing_part_pks,
670 &mut counts,
671 ));
672 }
673
674 outcomes.sort_by_key(|outcome| outcome.index);
675 Ok((outcomes, counts))
676 }
677
678 pub async fn upsert_messages(
679 &self,
680 session: &Session,
681 messages: &[MessageWrite<'_>],
682 ) -> Result<Vec<UpsertStatus>> {
683 if messages.is_empty() {
684 return Ok(Vec::new());
685 }
686
687 let rows = messages
688 .iter()
689 .map(|write| MessageBatchRow {
690 message: write.message,
691 source_agent: &session.source_agent,
692 project: &session.project,
693 search_text: write.search_text,
694 })
695 .collect::<Vec<_>>();
696 let batches = messages_batches(&rows)?;
697 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698 Ok(statuses_from_inserted(messages.len(), inserted))
699 }
700
701 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702 if parts.is_empty() {
703 return Ok(Vec::new());
704 }
705 let batches = parts_batches(parts)?;
706 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707 Ok(statuses_from_inserted(parts.len(), inserted))
708 }
709
710 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711 let Some(session) = self.find_session(session_id).await? else {
712 return Ok(None);
713 };
714 let messages = self.messages_for_session(session_id).await?;
715 Ok(Some(SessionWithMessages { session, messages }))
716 }
717
718 pub async fn session_ids(&self) -> Result<Vec<String>> {
720 let batch = self
721 .handle
722 .scan_batch(Table::Sessions, None, &["id"])
723 .await?;
724 let mut ids = Vec::with_capacity(batch.num_rows());
725 for row in 0..batch.num_rows() {
726 if let Some(id) = string(&batch, "id", row)? {
727 ids.push(id);
728 }
729 }
730 Ok(ids)
731 }
732
733 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734 let batch = self
735 .handle
736 .scan_batch(
737 Table::Sessions,
738 Some(&Predicate::Eq(
739 "parent_session_id",
740 parent_session_id.into(),
741 )),
742 &[
743 "id",
744 "parent_session_id",
745 "parent_message_id",
746 "source_agent",
747 "created_at",
748 "project",
749 "options",
750 ],
751 )
752 .await?;
753 let mut sessions = Vec::with_capacity(batch.num_rows());
754 for row in 0..batch.num_rows() {
755 sessions.push(session_from_batch(&batch, row)?);
756 }
757 sessions.sort_by(|left, right| left.id.cmp(&right.id));
758 Ok(sessions)
759 }
760
761 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767 use lance::deps::arrow_array::UInt64Array;
768
769 let dataset = self.handle.dataset(Table::Sessions).await?;
770 let version_list = dataset.versions().await?;
771 let versions: HashMap<u64, DateTime<Utc>> = version_list
772 .iter()
773 .map(|v| (v.version, v.timestamp))
774 .collect();
775 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784 let scanner = self
785 .handle
786 .scan(
787 Table::Sessions,
788 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789 )
790 .await?;
791 let mut stream = scanner.try_into_stream().await?;
792 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793 while let Some(batch) = stream.next().await {
794 let batch = batch?;
795 let version_array = batch
796 .column_by_name("_row_last_updated_at_version")
797 .context("missing _row_last_updated_at_version column")?
798 .as_any()
799 .downcast_ref::<UInt64Array>()
800 .context("_row_last_updated_at_version is not UInt64")?;
801 for row in 0..batch.num_rows() {
802 let Some(id) = string(&batch, "id", row)? else {
803 continue;
804 };
805 if version_array.is_null(row) {
806 continue;
807 }
808 let version = version_array.value(row);
809 let ts = versions.get(&version).copied().or(oldest_visible_ts);
810 if let Some(ts) = ts {
811 out.insert(id, ts);
812 }
813 }
814 }
815 Ok(out)
816 }
817
818 pub async fn session_view(
825 &self,
826 session_id: &str,
827 params: SessionViewParams<'_>,
828 ) -> Result<GetLookup<SessionPage>> {
829 let Some(session) = self.find_session(session_id).await? else {
830 return Ok(GetLookup::NotFound);
831 };
832
833 let mut rows = match params.mode {
834 ResponseMode::Conversational => self
835 .scan_conversational_messages(session_id)
836 .await?
837 .into_iter()
838 .map(|row| ScanRow {
839 id: row.message_id,
840 role: row.role,
841 timestamp: row.timestamp,
842 text: Some(row.text.into_inner()),
843 content: None,
844 })
845 .collect(),
846 ResponseMode::Complete | ResponseMode::Verbatim => {
847 self.scan_all_messages(session_id).await?
848 }
849 };
850 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852 let start_at = match params.after_id {
853 Some(after) => match rows.iter().position(|row| row.id == after) {
856 Some(idx) => idx + 1,
857 None => return Ok(GetLookup::UnknownAfterId),
858 },
859 None => 0,
860 };
861 let remaining = rows.get(start_at..).unwrap_or(&[]);
862 let (emitted, messages_remaining) = match params.session_from {
863 SessionFrom::Start => {
864 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865 row.text.as_deref().map_or(0, str::len)
866 });
867 (&remaining[..n], remaining.len() - n)
868 }
869 SessionFrom::End => {
873 let mut bytes = 0usize;
874 let mut start = remaining.len();
875 for row in remaining.iter().rev() {
876 if remaining.len() - start >= params.limit {
877 break;
878 }
879 let size = row.text.as_deref().map_or(0, str::len);
880 if start < remaining.len() && bytes + size > params.budget_bytes {
881 break;
882 }
883 bytes += size;
884 start -= 1;
885 }
886 (&remaining[start..], start)
887 }
888 };
889 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891 let mut parts_by_message = match params.mode {
894 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895 ResponseMode::Conversational | ResponseMode::Complete => {
896 self.summary_parts_for_messages(session_id, &ids).await?
897 }
898 };
899 let messages = emitted
900 .iter()
901 .map(|row| RetrievedMessage {
902 id: row.id.clone(),
903 role: row.role,
904 timestamp: row.timestamp,
905 text: row.text.clone(),
906 content: row.content.clone(),
907 parts: parts_by_message
908 .remove(&(session_id.to_owned(), row.id.clone()))
909 .unwrap_or_default(),
910 })
911 .collect();
912
913 Ok(GetLookup::Found(SessionPage {
914 session,
915 messages,
916 messages_remaining,
917 }))
918 }
919
920 pub async fn message_view(
926 &self,
927 message_id: &str,
928 params: MessageViewParams<'_>,
929 ) -> Result<GetLookup<MessagePage>> {
930 let Some(session_id) = self.session_id_for_message(message_id).await? else {
931 return Ok(GetLookup::NotFound);
932 };
933 let Some(session) = self.find_session(&session_id).await? else {
934 return Ok(GetLookup::NotFound);
935 };
936 let mut rows = self.scan_all_messages(&session_id).await?;
937 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
938 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
939 return Ok(GetLookup::NotFound);
940 };
941
942 let start = target_pos.saturating_sub(params.context_depth);
943 let end = (target_pos + params.context_depth + 1).min(rows.len());
944 let window = &rows[start..end];
945 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
946 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
949
950 let all_parts = parts_by_message
951 .remove(&(session_id.clone(), message_id.to_owned()))
952 .unwrap_or_default();
953 let start_part = match params.after_id {
954 Some(after) => match all_parts.iter().find(|part| part.id == after) {
958 Some(anchor) => all_parts
959 .iter()
960 .position(|part| part.ordinal > anchor.ordinal)
961 .unwrap_or(all_parts.len()),
962 None => return Ok(GetLookup::UnknownAfterId),
963 },
964 None => 0,
965 };
966 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
967 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
968 serde_json::to_string(part).map_or(0, |json| json.len())
969 });
970 let target_parts = remaining_parts[..part_count].to_vec();
971 let target_parts_remaining = remaining_parts.len() - part_count;
972
973 let target_row = &rows[target_pos];
974 let target = RetrievedMessage {
975 id: target_row.id.clone(),
976 role: target_row.role,
977 timestamp: target_row.timestamp,
978 text: target_row.text.clone(),
979 content: target_row.content.clone(),
980 parts: Vec::new(),
982 };
983 let siblings = window
984 .iter()
985 .enumerate()
986 .filter(|(idx, _)| start + idx != target_pos)
987 .map(|(_, row)| RetrievedMessage {
988 id: row.id.clone(),
989 role: row.role,
990 timestamp: row.timestamp,
991 text: row.text.clone(),
992 content: row.content.clone(),
993 parts: parts_by_message
994 .get(&(session_id.clone(), row.id.clone()))
995 .cloned()
996 .unwrap_or_default(),
997 })
998 .collect();
999
1000 Ok(GetLookup::Found(MessagePage {
1001 session,
1002 target,
1003 target_parts,
1004 target_parts_remaining,
1005 siblings,
1006 }))
1007 }
1008
1009 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1010 let batch = self
1011 .handle
1012 .scan_batch(
1013 Table::Messages,
1014 Some(&Predicate::Eq("session_id", session_id.into())),
1015 &["id", "timestamp", "role", "search_text", "content"],
1016 )
1017 .await?;
1018 let mut rows = Vec::with_capacity(batch.num_rows());
1019 for row in 0..batch.num_rows() {
1020 let id = string(&batch, "id", row)?.context("message id is null")?;
1021 let role =
1022 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1023 let timestamp = datetime(&batch, "timestamp", row)?;
1024 rows.push(ScanRow {
1025 id,
1026 role,
1027 timestamp,
1028 text: string(&batch, "search_text", row)?,
1029 content: string(&batch, "content", row)?,
1030 });
1031 }
1032 Ok(rows)
1033 }
1034
1035 pub async fn scan_conversational_messages(
1039 &self,
1040 session_id: &str,
1041 ) -> Result<Vec<ConversationalRow>> {
1042 let filter = Predicate::And(vec![
1043 Predicate::Eq("session_id", session_id.into()),
1044 Predicate::IsNotNull("search_text"),
1045 ]);
1046 let batch = self
1047 .handle
1048 .scan_batch(
1049 Table::Messages,
1050 Some(&filter),
1051 &["id", "timestamp", "role", "search_text"],
1052 )
1053 .await?;
1054
1055 let mut rows = Vec::with_capacity(batch.num_rows());
1056 for row in 0..batch.num_rows() {
1057 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1058 let role =
1059 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1060 let timestamp = datetime(&batch, "timestamp", row)?;
1061 let text_str = string(&batch, "search_text", row)?.context(
1062 "search_text null after IsNotNull pushdown - storage invariant violated",
1063 )?;
1064 rows.push(ConversationalRow {
1065 session_id: session_id.to_owned(),
1066 message_id,
1067 role,
1068 timestamp,
1069 text: SearchText(text_str),
1070 });
1071 }
1072 rows.sort_by(|a, b| {
1073 a.timestamp
1074 .cmp(&b.timestamp)
1075 .then_with(|| a.message_id.cmp(&b.message_id))
1076 });
1077 Ok(rows)
1078 }
1079
1080 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1083 let batch = self
1084 .handle
1085 .scan_batch(
1086 Table::Messages,
1087 Some(&Predicate::Eq("id", message_id.into())),
1088 &["session_id"],
1089 )
1090 .await?;
1091 if batch.num_rows() == 0 {
1092 return Ok(None);
1093 }
1094 string(&batch, "session_id", 0)
1095 }
1096
1097 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1098 self.handle.row_counts().await
1099 }
1100
1101 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1105 Ok(Arc::new(self.handle.dataset(table).await?))
1106 }
1107
1108 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1110 self.handle.export_write(name, bytes).await
1111 }
1112
1113 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1115 self.handle.export_read(name).await
1116 }
1117
1118 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1120 self.handle.export_local_path(name)
1121 }
1122
1123 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1129 let scanner = self
1130 .handle
1131 .scan(
1132 Table::Messages,
1133 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1134 )
1135 .await?;
1136 let mut stream = scanner.try_into_stream().await?;
1137 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1138 while let Some(batch) = stream.next().await {
1139 let batch = batch?;
1140 for row in 0..batch.num_rows() {
1141 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1142 let project = string(&batch, "project", row)?.unwrap_or_default();
1143 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1144 let is_subagent = source_agent.contains('/');
1145 if is_subagent && !include_subagents {
1146 continue;
1147 }
1148 let entry = groups.entry((source_agent, project)).or_default();
1149 entry.messages += 1;
1150 entry.session_ids.insert(session_id);
1151 }
1152 }
1153
1154 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1155 let totals = RowTotals {
1156 sessions: totals_sessions as u64,
1157 messages: totals_messages as u64,
1158 parts: totals_parts as u64,
1159 };
1160
1161 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1162 for ((adapter, project), acc) in groups {
1163 by_adapter.entry(adapter).or_default().push(ProjectStats {
1164 project,
1165 sessions: acc.session_ids.len() as u64,
1166 messages: acc.messages,
1167 });
1168 }
1169
1170 let mut adapters = Vec::with_capacity(by_adapter.len());
1171 for (adapter, mut projects) in by_adapter {
1172 projects.sort_by(|a, b| {
1173 b.messages
1174 .cmp(&a.messages)
1175 .then_with(|| a.project.cmp(&b.project))
1176 });
1177 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1178 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1179 adapters.push(AdapterStats {
1180 adapter,
1181 sessions,
1182 messages,
1183 projects,
1184 });
1185 }
1186
1187 Ok(CorpusStats {
1188 data_url: self.handle.location().clone(),
1189 totals,
1190 adapters,
1191 include_subagents,
1192 })
1193 }
1194
1195 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1200 if rows.is_empty() {
1201 return Ok(());
1202 }
1203 let batch = embedding_update_batch(rows)?;
1204 self.handle
1205 .merge_update(Table::Messages, batch, rows.len())
1206 .await?;
1207 Ok(())
1208 }
1209
1210 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1213 try_stream! {
1214 let filter = Predicate::And(vec![
1215 Predicate::IsNull("vector"),
1216 Predicate::IsNotNull("search_text"),
1217 ]);
1218 let projection: &[&str] = &["session_id", "id", "search_text"];
1219 let scanner = self
1220 .handle
1221 .scan(
1222 Table::Messages,
1223 ScanOpts::with_predicate_and_projection(&filter, projection),
1224 )
1225 .await?;
1226 let mut batches = scanner
1227 .try_into_stream()
1228 .await
1229 .context("failed to open messages stream")?;
1230 while let Some(batch) = batches.next().await {
1231 let batch = batch?;
1232 for row in 0..batch.num_rows() {
1233 yield PendingMessage {
1234 session_id: string(&batch, "session_id", row)?
1235 .context("session_id is null")?,
1236 id: string(&batch, "id", row)?.context("message id is null")?,
1237 search_text: string(&batch, "search_text", row)?
1238 .context("search_text is null")?,
1239 };
1240 }
1241 }
1242 }
1243 }
1244
1245 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1250 try_stream! {
1251 let filter = Predicate::And(vec![
1252 Predicate::IsNotNull("search_text"),
1253 Predicate::Or(vec![
1254 Predicate::IsNull("vector"),
1255 Predicate::Ne("embedding_model", embed::model_id().into()),
1256 ]),
1257 ]);
1258 let projection: &[&str] = &["session_id", "id", "search_text"];
1259 let scanner = self
1260 .handle
1261 .scan(
1262 Table::Messages,
1263 ScanOpts::with_predicate_and_projection(&filter, projection),
1264 )
1265 .await?;
1266 let mut batches = scanner
1267 .try_into_stream()
1268 .await
1269 .context("failed to open pending-or-stale messages stream")?;
1270 while let Some(batch) = batches.next().await {
1271 let batch = batch?;
1272 for row in 0..batch.num_rows() {
1273 yield PendingMessage {
1274 session_id: string(&batch, "session_id", row)?
1275 .context("session_id is null")?,
1276 id: string(&batch, "id", row)?.context("message id is null")?,
1277 search_text: string(&batch, "search_text", row)?
1278 .context("search_text is null")?,
1279 };
1280 }
1281 }
1282 }
1283 }
1284
1285 pub async fn fts_search(
1287 &self,
1288 query: &str,
1289 limit: usize,
1290 filter: &Predicate,
1291 ) -> Result<Vec<(MessageKey, f32)>> {
1292 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1293 scanner.full_text_search(
1294 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1295 )?;
1296 scanner.disable_scoring_autoprojection();
1302 scanner.project(&["session_id", "id", "_score"])?;
1303 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1304 let batch = scanner.try_into_batch().await?;
1305 let mut hits = Vec::with_capacity(batch.num_rows());
1306 for row in 0..batch.num_rows() {
1307 let key = MessageKey {
1308 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1309 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1310 };
1311 hits.push((key, float32(&batch, "_score", row)?));
1312 }
1313 hits.sort_by(|left, right| {
1321 right
1322 .1
1323 .partial_cmp(&left.1)
1324 .unwrap_or(std::cmp::Ordering::Equal)
1325 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1326 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1327 });
1328 Ok(hits)
1329 }
1330
1331 pub async fn has_embeddings(&self) -> Result<bool> {
1336 let scope = Predicate::IsNotNull("vector");
1337 let mut scanner = self
1338 .handle
1339 .scan(
1340 Table::Messages,
1341 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1342 )
1343 .await?;
1344 scanner.limit(Some(1), None)?;
1345 let batch = scanner.try_into_batch().await?;
1346 Ok(batch.num_rows() > 0)
1347 }
1348
1349 pub async fn vector_search(
1357 &self,
1358 query: &[f32],
1359 limit: usize,
1360 filter: &Predicate,
1361 search: Option<&config::SearchConfig>,
1362 ) -> Result<Vec<(MessageKey, f32)>> {
1363 let scope = embedded_scope(filter);
1364 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1365 let key = Float32Array::from(query.to_vec());
1366 scanner.nearest("vector", &key, limit)?;
1367 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1368 scanner.nprobes(nprobes);
1369 }
1370 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1371 scanner.refine(refine_factor);
1372 }
1373 scanner.disable_scoring_autoprojection();
1377 scanner.project(&["session_id", "id", "_distance"])?;
1378 let batch = scanner.try_into_batch().await?;
1379 let mut hits = Vec::with_capacity(batch.num_rows());
1380 for row in 0..batch.num_rows() {
1381 let key = MessageKey {
1382 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1383 message_id: string(&batch, "id", row)?.context("message id is null")?,
1384 };
1385 hits.push((key, float32(&batch, "_distance", row)?));
1386 }
1387 hits.sort_by(|left, right| {
1393 left.1
1394 .partial_cmp(&right.1)
1395 .unwrap_or(std::cmp::Ordering::Equal)
1396 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1397 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1398 });
1399 Ok(hits)
1400 }
1401
1402 pub async fn explain_vector_plan(
1405 &self,
1406 query: &[f32],
1407 limit: usize,
1408 filter: &Predicate,
1409 search: Option<&config::SearchConfig>,
1410 ) -> Result<String> {
1411 let scope = embedded_scope(filter);
1412 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1413 let key = Float32Array::from(query.to_vec());
1414 scanner.nearest("vector", &key, limit)?;
1415 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1416 scanner.nprobes(nprobes);
1417 }
1418 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1419 scanner.refine(refine_factor);
1420 }
1421 scanner
1422 .explain_plan(true)
1423 .await
1424 .context("explain_plan failed")
1425 }
1426
1427 pub async fn explain_fts_plan(
1428 &self,
1429 query: &str,
1430 limit: usize,
1431 filter: &Predicate,
1432 ) -> Result<String> {
1433 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1434 scanner.full_text_search(
1435 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1436 )?;
1437 scanner.project(&["session_id", "id"])?;
1438 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1439 scanner
1440 .explain_plan(true)
1441 .await
1442 .context("explain_plan failed")
1443 }
1444
1445 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1447 if keys.is_empty() {
1448 return Ok(Vec::new());
1449 }
1450 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1451 let session_ids = keys
1452 .iter()
1453 .map(|key| key.session_id.clone())
1454 .collect::<Vec<_>>();
1455 let message_ids = keys
1456 .iter()
1457 .map(|key| key.message_id.clone())
1458 .collect::<Vec<_>>();
1459 let predicate = Predicate::And(vec![
1460 in_predicate("session_id", &session_ids),
1461 in_predicate("id", &message_ids),
1462 ]);
1463 let batch = self
1464 .handle
1465 .scan_batch(
1466 Table::Messages,
1467 Some(&predicate),
1468 &[
1469 "id",
1470 "session_id",
1471 "role",
1472 "project",
1473 "source_agent",
1474 "timestamp",
1475 "search_text",
1476 ],
1477 )
1478 .await?;
1479 let mut metas = Vec::with_capacity(batch.num_rows());
1480 for row in 0..batch.num_rows() {
1481 let message_id = string(&batch, "id", row)?.context("id is null")?;
1482 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1483 if !wanted.contains(&MessageKey {
1484 session_id: session_id.clone(),
1485 message_id: message_id.clone(),
1486 }) {
1487 continue;
1488 }
1489 metas.push(MessageMeta {
1490 message_id,
1491 session_id,
1492 role: string(&batch, "role", row)?.context("role is null")?,
1493 project: string(&batch, "project", row)?.context("project is null")?,
1494 source_agent: string(&batch, "source_agent", row)?
1495 .context("source_agent is null")?,
1496 timestamp: datetime(&batch, "timestamp", row)?,
1497 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1498 });
1499 }
1500 Ok(metas)
1501 }
1502
1503 pub async fn session_message_counts(
1505 &self,
1506 session_ids: &[String],
1507 ) -> Result<BTreeMap<String, usize>> {
1508 if session_ids.is_empty() {
1509 return Ok(BTreeMap::new());
1510 }
1511 let dataset = self.handle.dataset(Table::Messages).await?;
1512 let mut tasks = tokio::task::JoinSet::new();
1513 for session_id in session_ids {
1514 let dataset = dataset.clone();
1515 let session_id = session_id.clone();
1516 tasks.spawn(async move {
1517 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1518 let count = dataset.count_rows(Some(filter)).await?;
1519 anyhow::Ok((session_id, count))
1520 });
1521 }
1522 let mut counts = BTreeMap::new();
1523 while let Some(joined) = tasks.join_next().await {
1524 let (session_id, count) = joined.context("session count task panicked")??;
1525 counts.insert(session_id, count);
1526 }
1527 Ok(counts)
1528 }
1529
1530 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1533 self.handle
1534 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1535 .await
1536 }
1537
1538 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1544 self.handle
1545 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1546 .await
1547 }
1548
1549 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1556 let dataset = self.handle.dataset(Table::Messages).await?;
1557 let embedded = dataset
1558 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1559 .await?;
1560 let total = dataset
1561 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1562 .await?;
1563 Ok(EmbeddingProgress {
1564 embedded,
1565 total,
1566 model: embed::model_id(),
1567 })
1568 }
1569
1570 pub async fn stale_embedding_count(&self) -> Result<usize> {
1574 let dataset = self.handle.dataset(Table::Messages).await?;
1575 dataset
1576 .count_rows(Some(
1577 Predicate::And(vec![
1578 Predicate::IsNotNull("vector"),
1579 Predicate::Ne("embedding_model", embed::model_id().into()),
1580 ])
1581 .to_lance(),
1582 ))
1583 .await
1584 .map_err(Into::into)
1585 }
1586
1587 pub async fn optimize_indices(
1593 &self,
1594 progress: Option<OptimizeProgressFn>,
1595 maintenance: &MaintenancePolicy,
1596 ) -> Result<OptimizeOutcome> {
1597 let intents = pond_index_intents();
1598 let mut tables = Vec::with_capacity(3);
1599 for (table, intents) in intents.all() {
1600 let outcome = self
1601 .handle
1602 .optimize_table(table, intents, progress.as_ref(), maintenance)
1603 .await;
1604 tables.push(outcome);
1605 }
1606 Ok(OptimizeOutcome { tables })
1607 }
1608
1609 pub async fn build_indices_only(
1615 &self,
1616 progress: Option<OptimizeProgressFn>,
1617 ) -> Result<OptimizeOutcome> {
1618 let policy = pond_index_intents();
1619 let mut tables = Vec::with_capacity(3);
1620 for (table, intents) in policy.all() {
1621 let indices = self
1622 .handle
1623 .optimize_table_indices_only(table, intents, progress.as_ref())
1624 .await;
1625 tables.push(TableOptimizeOutcome {
1626 table,
1627 indices,
1628 compaction: PhaseOutcome::NotAttempted,
1629 });
1630 }
1631 Ok(OptimizeOutcome { tables })
1632 }
1633
1634 #[cfg(test)]
1635 async fn optimize_indices_with_vector_threshold(
1636 &self,
1637 vector_threshold: usize,
1638 ) -> Result<OptimizeOutcome> {
1639 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1640 let policy = MaintenancePolicy::always_compact();
1641 let mut tables = Vec::with_capacity(3);
1642 for (table, intents) in intents.all() {
1643 let outcome = self
1644 .handle
1645 .optimize_table(table, intents, None, &policy)
1646 .await;
1647 tables.push(outcome);
1648 }
1649 Ok(OptimizeOutcome { tables })
1650 }
1651
1652 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1653 let policy = pond_index_intents();
1654 let mut matched = false;
1655 for (table, intents) in policy.all() {
1656 for intent in intents {
1657 if intent_name.is_none_or(|name| name == intent.name) {
1658 matched = true;
1659 self.handle.rebuild_index(table, intent).await?;
1660 }
1661 }
1662 }
1663 if let Some(name) = intent_name
1664 && !matched
1665 {
1666 anyhow::bail!("unknown index intent {name:?}");
1667 }
1668 Ok(())
1669 }
1670
1671 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1672 let policy = pond_index_intents();
1673 let mut statuses = Vec::new();
1674 for (table, intents) in policy.all() {
1675 statuses.extend(self.handle.index_status(table, intents).await?);
1676 }
1677 Ok(statuses)
1678 }
1679
1680 pub async fn drop_vector_index(&self) -> Result<()> {
1684 match self
1685 .handle
1686 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1687 .await
1688 {
1689 Ok(()) => Ok(()),
1690 Err(error) => {
1691 let msg = error.to_string();
1692 if msg.contains("not found") || msg.contains("does not exist") {
1693 Ok(())
1694 } else {
1695 Err(error)
1696 }
1697 }
1698 }
1699 }
1700
1701 pub async fn table_sizes(&self) -> Result<TableSizes> {
1704 self.handle.table_sizes().await
1705 }
1706
1707 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1708 let batch = self
1709 .handle
1710 .scan_batch(
1711 Table::Sessions,
1712 Some(&Predicate::Eq("id", session_id.into())),
1713 &[],
1714 )
1715 .await?;
1716 if batch.num_rows() == 0 {
1717 Ok(None)
1718 } else {
1719 Ok(Some(session_from_batch(&batch, 0)?))
1720 }
1721 }
1722
1723 pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1729 let batch = self
1730 .handle
1731 .scan_batch(
1732 Table::Messages,
1733 Some(&Predicate::Eq("id", message_id.into())),
1734 &["vector"],
1735 )
1736 .await?;
1737 if batch.num_rows() == 0 {
1738 return Ok(None);
1739 }
1740 let column = batch
1741 .column(0)
1742 .as_any()
1743 .downcast_ref::<FixedSizeListArray>();
1744 let Some(list) = column else {
1745 return Ok(None);
1746 };
1747 if list.is_null(0) {
1748 return Ok(None);
1749 }
1750 let values = list.value(0);
1751 let halves = values
1752 .as_any()
1753 .downcast_ref::<Float16Array>()
1754 .context("messages.vector inner array is not Float16")?;
1755 let widened = (0..halves.len())
1756 .map(|i| halves.value(i).to_f32())
1757 .collect();
1758 Ok(Some(widened))
1759 }
1760
1761 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1762 let batch = self
1763 .handle
1764 .scan_batch(
1765 Table::Messages,
1766 Some(&Predicate::Eq("session_id", session_id.into())),
1767 &[
1768 "session_id",
1769 "id",
1770 "timestamp",
1771 "role",
1772 "content",
1773 "options",
1774 ],
1775 )
1776 .await?;
1777 let mut messages = Vec::with_capacity(batch.num_rows());
1778 for row in 0..batch.num_rows() {
1779 messages.push(message_from_batch(&batch, row)?);
1780 }
1781 messages.sort_by(|left, right| {
1782 left.timestamp()
1783 .cmp(&right.timestamp())
1784 .then_with(|| left.id().cmp(right.id()))
1785 });
1786
1787 let message_ids = messages
1788 .iter()
1789 .map(|message| message.id().to_owned())
1790 .collect::<Vec<_>>();
1791 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1792
1793 Ok(messages
1794 .into_iter()
1795 .map(|message| {
1796 let key = (message.session_id().to_owned(), message.id().to_owned());
1797 let parts = parts_by_message.remove(&key).unwrap_or_default();
1798 MessageWithParts { message, parts }
1799 })
1800 .collect())
1801 }
1802
1803 pub async fn parts_for_messages(
1807 &self,
1808 session_id: &str,
1809 message_ids: &[String],
1810 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1811 self.scan_parts(session_id, message_ids, None).await
1812 }
1813
1814 pub async fn summary_parts_for_messages(
1819 &self,
1820 session_id: &str,
1821 message_ids: &[String],
1822 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1823 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1824 .await
1825 }
1826
1827 async fn scan_parts(
1828 &self,
1829 session_id: &str,
1830 message_ids: &[String],
1831 part_types: Option<&[&str]>,
1832 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1833 if message_ids.is_empty() {
1834 return Ok(BTreeMap::new());
1835 }
1836 let mut clauses = vec![
1837 Predicate::Eq("session_id", session_id.into()),
1838 in_predicate("message_id", message_ids),
1839 ];
1840 if let Some(types) = part_types {
1841 clauses.push(Predicate::In(
1842 "type",
1843 types.iter().map(|&t| t.into()).collect(),
1844 ));
1845 }
1846 let predicate = Predicate::And(clauses);
1847 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1848 let mut scanner = self
1849 .handle
1850 .scan(
1851 Table::Parts,
1852 ScanOpts::with_predicate_and_projection(
1853 &predicate,
1854 &[
1855 "session_id",
1856 "message_id",
1857 "id",
1858 "ordinal",
1859 "type",
1860 "provenance",
1861 "variant_data",
1862 "options",
1863 ],
1864 ),
1865 )
1866 .await?;
1867 scanner.with_row_address();
1868 let batch = scanner.try_into_batch().await.context("scan failed")?;
1869 let row_addresses = uint64(&batch, "_rowaddr")?;
1870 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1871 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1872 for row in 0..batch.num_rows() {
1873 if string(&batch, "type", row)?.as_deref() == Some("file") {
1874 let variant_data =
1875 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1876 file_rows.push((row, row_addresses.value(row), variant_data));
1877 }
1878 }
1879 if !file_rows.is_empty() {
1880 let addresses = file_rows
1881 .iter()
1882 .map(|(_, address, _)| *address)
1883 .collect::<Vec<_>>();
1884 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1885 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1886 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1890 file_payloads.insert(row, payload);
1891 }
1892 }
1893 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1894 for row in 0..batch.num_rows() {
1895 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1896 parts_by_message
1897 .entry((part.session_id.clone(), part.message_id.clone()))
1898 .or_default()
1899 .push(part);
1900 }
1901 for parts in parts_by_message.values_mut() {
1902 parts.sort_by_key(|part| part.ordinal);
1903 }
1904 Ok(parts_by_message)
1905 }
1906}
1907
1908#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1909#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1910pub enum IngestEvent {
1911 Session(Session),
1912 Message(Message),
1913 Part(Part),
1914}
1915
1916#[derive(Debug, Clone, PartialEq, Eq, Default)]
1924pub struct IngestSummary {
1925 pub inserted: usize,
1929 pub matched: usize,
1931 pub sessions_inserted: usize,
1933 pub messages_inserted_total: usize,
1936 pub messages_inserted_searchable: usize,
1940 pub parts_inserted: usize,
1942 pub sessions_matched: usize,
1944 pub messages_matched_total: usize,
1946 pub messages_matched_searchable: usize,
1948 pub parts_matched: usize,
1950 pub dropped_events: usize,
1960 pub dropped_sessions: usize,
1965 pub skipped_files: usize,
1968 pub skipped_empty: usize,
1973 pub skipped_fresh: usize,
1977 pub storage_errors: usize,
1981 pub truncated_values: usize,
1984 pub drop_reasons: BTreeMap<&'static str, usize>,
1990}
1991
1992pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1998pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1999pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2000pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2001pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2002pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2003pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2004pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2005pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2006pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2007pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2008
2009#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2017pub struct BatchCounts {
2018 pub sessions_inserted: usize,
2019 pub sessions_matched: usize,
2020 pub messages_inserted_total: usize,
2021 pub messages_inserted_searchable: usize,
2022 pub messages_matched_total: usize,
2023 pub messages_matched_searchable: usize,
2024 pub parts_inserted: usize,
2025 pub parts_matched: usize,
2026}
2027
2028impl IngestSummary {
2029 pub fn accepted(&self) -> usize {
2030 self.inserted + self.matched
2031 }
2032
2033 pub fn add_batch(&mut self, counts: &BatchCounts) {
2037 self.sessions_inserted += counts.sessions_inserted;
2038 self.sessions_matched += counts.sessions_matched;
2039 self.messages_inserted_total += counts.messages_inserted_total;
2040 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2041 self.messages_matched_total += counts.messages_matched_total;
2042 self.messages_matched_searchable += counts.messages_matched_searchable;
2043 self.parts_inserted += counts.parts_inserted;
2044 self.parts_matched += counts.parts_matched;
2045 self.inserted +=
2046 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2047 self.matched +=
2048 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2049 }
2050
2051 pub fn merge(&mut self, other: &Self) {
2055 self.inserted += other.inserted;
2056 self.matched += other.matched;
2057 self.sessions_inserted += other.sessions_inserted;
2058 self.messages_inserted_total += other.messages_inserted_total;
2059 self.messages_inserted_searchable += other.messages_inserted_searchable;
2060 self.parts_inserted += other.parts_inserted;
2061 self.sessions_matched += other.sessions_matched;
2062 self.messages_matched_total += other.messages_matched_total;
2063 self.messages_matched_searchable += other.messages_matched_searchable;
2064 self.parts_matched += other.parts_matched;
2065 self.dropped_events += other.dropped_events;
2066 self.dropped_sessions += other.dropped_sessions;
2067 self.skipped_files += other.skipped_files;
2068 self.skipped_empty += other.skipped_empty;
2069 self.skipped_fresh += other.skipped_fresh;
2070 self.storage_errors += other.storage_errors;
2071 self.truncated_values += other.truncated_values;
2072 for (key, value) in &other.drop_reasons {
2073 *self.drop_reasons.entry(key).or_insert(0) += value;
2074 }
2075 }
2076
2077 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2082 for outcome in outcomes {
2083 if !matches!(outcome.status, OutcomeStatus::Error) {
2084 continue;
2085 }
2086 if outcome.kind == "session" {
2087 self.dropped_sessions += 1;
2088 } else {
2089 self.dropped_events += 1;
2090 }
2091 let reason = outcome
2092 .error
2093 .as_ref()
2094 .and_then(|error| error.reason_key)
2095 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2096 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2097 }
2098 }
2099
2100 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2101 for outcome in outcomes {
2102 match outcome.status {
2103 OutcomeStatus::Inserted => {
2104 self.inserted += 1;
2105 match outcome.kind {
2106 "session" => self.sessions_inserted += 1,
2107 "message" => {
2108 self.messages_inserted_total += 1;
2109 if outcome.searchable {
2110 self.messages_inserted_searchable += 1;
2111 }
2112 }
2113 "part" => self.parts_inserted += 1,
2114 _ => {}
2115 }
2116 }
2117 OutcomeStatus::Matched => {
2118 self.matched += 1;
2119 match outcome.kind {
2120 "session" => self.sessions_matched += 1,
2121 "message" => {
2122 self.messages_matched_total += 1;
2123 if outcome.searchable {
2124 self.messages_matched_searchable += 1;
2125 }
2126 }
2127 "part" => self.parts_matched += 1,
2128 _ => {}
2129 }
2130 }
2131 OutcomeStatus::Error => {
2132 if outcome.kind == "session" {
2138 self.dropped_sessions += 1;
2139 } else {
2140 self.dropped_events += 1;
2141 }
2142 let reason = outcome
2143 .error
2144 .as_ref()
2145 .and_then(|e| e.reason_key)
2146 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2147 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2148 }
2149 }
2150 }
2151 }
2152}
2153
2154#[derive(Debug, Clone, PartialEq)]
2159pub struct RowOutcome {
2160 pub index: usize,
2161 pub kind: &'static str,
2162 pub pk: Value,
2163 pub status: OutcomeStatus,
2164 pub error: Option<RowError>,
2165 pub searchable: bool,
2170}
2171
2172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2173pub enum OutcomeStatus {
2174 Inserted,
2175 Matched,
2176 Error,
2177}
2178
2179#[derive(Debug, Clone, PartialEq, Eq)]
2182pub struct RowError {
2183 pub message: String,
2184 pub field: Option<&'static str>,
2185 pub reason: Option<&'static str>,
2186 pub reason_key: Option<&'static str>,
2191}
2192
2193#[derive(Debug)]
2197struct BufferedSession {
2198 index: usize,
2199 session: Session,
2200}
2201
2202#[derive(Debug)]
2203struct BufferedMessage {
2204 index: usize,
2205 message: Message,
2206 parts: Vec<BufferedPart>,
2207 search_text: Option<String>,
2208}
2209
2210#[derive(Debug)]
2211struct BufferedPart {
2212 index: usize,
2213 part: Part,
2214}
2215
2216#[derive(Debug, Default)]
2233pub struct IngestValidator {
2234 session: Option<BufferedSession>,
2235 current_message: Option<BufferedMessage>,
2236 current_parts: Vec<BufferedPart>,
2237 messages: Vec<BufferedMessage>,
2238 seen_message_ids: HashSet<String>,
2242 seen_part_keys: HashSet<(String, String)>,
2245 completed: Vec<CompletedSubstream>,
2249}
2250
2251#[derive(Debug)]
2253struct CompletedSubstream {
2254 session_index: usize,
2255 session: Session,
2256 messages: Vec<BufferedMessage>,
2257}
2258
2259impl IngestValidator {
2260 pub async fn push(
2266 &mut self,
2267 store: &Store,
2268 index: usize,
2269 event: IngestEvent,
2270 ) -> Result<Vec<RowOutcome>> {
2271 match event {
2272 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2273 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2274 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2275 }
2276 }
2277
2278 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2283 self.close_current_substream();
2284 self.flush(store).await
2285 }
2286
2287 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2294 if self.completed.is_empty() {
2295 return Ok((Vec::new(), BatchCounts::default()));
2296 }
2297 let completed = std::mem::take(&mut self.completed);
2298 store.upsert_session_batch(completed).await
2299 }
2300
2301 pub fn pending_substreams(&self) -> usize {
2304 self.completed.len()
2305 }
2306
2307 async fn push_session(
2308 &mut self,
2309 _store: &Store,
2310 index: usize,
2311 mut session: Session,
2312 ) -> Result<Vec<RowOutcome>> {
2313 self.close_current_substream();
2317
2318 let trimmed = session.source_agent.trim();
2323 if trimmed.is_empty() {
2324 return Ok(vec![RowOutcome {
2325 index,
2326 kind: "session",
2327 pk: Value::String(session.id.clone()),
2328 status: OutcomeStatus::Error,
2329 error: Some(RowError {
2330 message: format!("session {} has empty source_agent after trim", session.id),
2331 field: Some("source_agent"),
2332 reason: None,
2333 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2334 }),
2335 searchable: false,
2336 }]);
2337 }
2338 if trimmed.len() != session.source_agent.len() {
2339 session.source_agent = trimmed.to_owned();
2340 }
2341
2342 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2343 return Ok(vec![RowOutcome {
2344 index,
2345 kind: "session",
2346 pk: Value::String(session.id.clone()),
2347 status: OutcomeStatus::Error,
2348 error: Some(RowError {
2349 message: format!(
2350 "session {} has parent_message_id without parent_session_id",
2351 session.id,
2352 ),
2353 field: Some("parent_message_id"),
2354 reason: None,
2355 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2356 }),
2357 searchable: false,
2358 }]);
2359 }
2360
2361 self.seen_message_ids.clear();
2362 self.seen_part_keys.clear();
2363 self.session = Some(BufferedSession { index, session });
2364 Ok(Vec::new())
2365 }
2366
2367 fn close_current_substream(&mut self) {
2368 self.flush_current_message();
2369 let Some(BufferedSession {
2370 index: session_index,
2371 session,
2372 }) = self.session.take()
2373 else {
2374 return;
2375 };
2376 let messages = std::mem::take(&mut self.messages);
2377 self.seen_message_ids.clear();
2378 self.seen_part_keys.clear();
2379 self.completed.push(CompletedSubstream {
2380 session_index,
2381 session,
2382 messages,
2383 });
2384 }
2385
2386 fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2387 let pk = Value::Array(vec![
2388 Value::String(message.session_id().to_owned()),
2389 Value::String(message.id().to_owned()),
2390 ]);
2391 let Some(session) = &self.session else {
2392 return vec![error_outcome(
2393 index,
2394 "message",
2395 pk,
2396 "first event in a session stream must be Session",
2397 None,
2398 DROP_REASON_MESSAGE_BEFORE_SESSION,
2399 )];
2400 };
2401 if message.session_id() != session.session.id {
2402 let msg = format!(
2403 "message {} references session {}, expected {}",
2404 message.id(),
2405 message.session_id(),
2406 session.session.id
2407 );
2408 return vec![error_outcome(
2409 index,
2410 "message",
2411 pk,
2412 &msg,
2413 Some("session_id"),
2414 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2415 )];
2416 }
2417 if !self.seen_message_ids.insert(message.id().to_owned()) {
2418 let msg = format!("duplicate message id {} in session substream", message.id());
2422 return vec![error_outcome(
2423 index,
2424 "message",
2425 pk,
2426 &msg,
2427 None,
2428 DROP_REASON_DUPLICATE_MESSAGE_ID,
2429 )];
2430 }
2431 self.flush_current_message();
2432 self.current_message = Some(BufferedMessage {
2433 index,
2434 message,
2435 parts: Vec::new(),
2436 search_text: None,
2437 });
2438 Vec::new()
2439 }
2440
2441 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2442 let pk = Value::Array(vec![
2443 Value::String(part.session_id.clone()),
2444 Value::String(part.message_id.clone()),
2445 Value::String(part.id.clone()),
2446 ]);
2447 let Some(current) = &self.current_message else {
2448 return vec![error_outcome(
2449 index,
2450 "part",
2451 pk,
2452 "part event appeared before a message",
2453 None,
2454 DROP_REASON_PART_BEFORE_MESSAGE,
2455 )];
2456 };
2457 if part.session_id != current.message.session_id() {
2458 let msg = format!(
2459 "part {} references session {}, expected {}",
2460 part.id,
2461 part.session_id,
2462 current.message.session_id()
2463 );
2464 return vec![error_outcome(
2465 index,
2466 "part",
2467 pk,
2468 &msg,
2469 Some("session_id"),
2470 DROP_REASON_PART_MESSAGE_MISMATCH,
2471 )];
2472 }
2473 if part.message_id != current.message.id() {
2474 let msg = format!(
2475 "part {} references message {}, expected {}",
2476 part.id,
2477 part.message_id,
2478 current.message.id()
2479 );
2480 return vec![error_outcome(
2481 index,
2482 "part",
2483 pk,
2484 &msg,
2485 Some("message_id"),
2486 DROP_REASON_PART_MESSAGE_MISMATCH,
2487 )];
2488 }
2489 let part_key = (part.message_id.clone(), part.id.clone());
2490 if !self.seen_part_keys.insert(part_key) {
2491 let msg = format!(
2492 "duplicate part id {} for message {} in session substream",
2493 part.id, part.message_id
2494 );
2495 return vec![error_outcome(
2496 index,
2497 "part",
2498 pk,
2499 &msg,
2500 None,
2501 DROP_REASON_DUPLICATE_PART_KEY,
2502 )];
2503 }
2504 self.current_parts.push(BufferedPart { index, part });
2505 Vec::new()
2506 }
2507
2508 fn flush_current_message(&mut self) {
2509 let Some(mut buffered) = self.current_message.take() else {
2510 return;
2511 };
2512 let parts = std::mem::take(&mut self.current_parts);
2513 let mut canonical_parts = Vec::with_capacity(parts.len());
2514 for part in &parts {
2515 canonical_parts.push(part.part.clone());
2516 }
2517 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2518 buffered.parts = parts;
2519 self.messages.push(buffered);
2520 }
2521}
2522
2523fn error_outcome(
2524 index: usize,
2525 kind: &'static str,
2526 pk: Value,
2527 message: &str,
2528 field: Option<&'static str>,
2529 reason_key: &'static str,
2530) -> RowOutcome {
2531 RowOutcome {
2532 index,
2533 kind,
2534 pk,
2535 status: OutcomeStatus::Error,
2536 error: Some(RowError {
2537 message: message.to_owned(),
2538 field,
2539 reason: None,
2540 reason_key: Some(reason_key),
2541 }),
2542 searchable: false,
2543 }
2544}
2545
2546fn error_outcomes_for_substream(
2551 session_index: usize,
2552 session: &Session,
2553 _messages: &[BufferedMessage],
2554 message: impl Into<String>,
2555 field: Option<&'static str>,
2556 reason_key: &'static str,
2557) -> Vec<RowOutcome> {
2558 let reason = field.map(|_| "immutable");
2559 vec![RowOutcome {
2560 index: session_index,
2561 kind: "session",
2562 pk: Value::String(session.id.clone()),
2563 status: OutcomeStatus::Error,
2564 error: Some(RowError {
2565 message: message.into(),
2566 field,
2567 reason,
2568 reason_key: Some(reason_key),
2569 }),
2570 searchable: false,
2571 }]
2572}
2573
2574fn success_outcomes_for_substream(
2580 session_index: usize,
2581 session: &Session,
2582 messages: &[BufferedMessage],
2583 existing_sessions: &std::collections::HashMap<String, Session>,
2584 existing_message_pks: &HashSet<(String, String)>,
2585 existing_part_pks: &HashSet<(String, String, String)>,
2586 counts: &mut BatchCounts,
2587) -> Vec<RowOutcome> {
2588 let session_was_present = existing_sessions.contains_key(&session.id);
2589 let session_status = if session_was_present {
2590 counts.sessions_matched += 1;
2591 UpsertStatus::Matched
2592 } else {
2593 counts.sessions_inserted += 1;
2594 UpsertStatus::Inserted
2595 };
2596
2597 let mut outcomes = Vec::with_capacity(1 + messages.len());
2598 outcomes.push(success_outcome(
2599 session_index,
2600 "session",
2601 Value::String(session.id.clone()),
2602 session_status,
2603 false,
2604 ));
2605 for buffered in messages {
2606 let key = (
2607 buffered.message.session_id().to_owned(),
2608 buffered.message.id().to_owned(),
2609 );
2610 let searchable = buffered.search_text.is_some();
2611 let message_status = if existing_message_pks.contains(&key) {
2612 counts.messages_matched_total += 1;
2613 if searchable {
2614 counts.messages_matched_searchable += 1;
2615 }
2616 UpsertStatus::Matched
2617 } else {
2618 counts.messages_inserted_total += 1;
2619 if searchable {
2620 counts.messages_inserted_searchable += 1;
2621 }
2622 UpsertStatus::Inserted
2623 };
2624 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2625 outcomes.push(success_outcome(
2626 buffered.index,
2627 "message",
2628 pk,
2629 message_status,
2630 searchable,
2631 ));
2632 for part in &buffered.parts {
2633 let part_key = (
2634 part.part.session_id.clone(),
2635 part.part.message_id.clone(),
2636 part.part.id.clone(),
2637 );
2638 let part_status = if existing_part_pks.contains(&part_key) {
2639 counts.parts_matched += 1;
2640 UpsertStatus::Matched
2641 } else {
2642 counts.parts_inserted += 1;
2643 UpsertStatus::Inserted
2644 };
2645 let part_pk = Value::Array(vec![
2646 Value::String(part_key.0),
2647 Value::String(part_key.1),
2648 Value::String(part_key.2),
2649 ]);
2650 outcomes.push(success_outcome(
2651 part.index,
2652 "part",
2653 part_pk,
2654 part_status,
2655 false,
2656 ));
2657 }
2658 }
2659 outcomes
2660}
2661
2662fn success_outcome(
2663 index: usize,
2664 kind: &'static str,
2665 pk: Value,
2666 status: UpsertStatus,
2667 searchable: bool,
2668) -> RowOutcome {
2669 let status = match status {
2670 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2671 UpsertStatus::Matched => OutcomeStatus::Matched,
2672 };
2673 RowOutcome {
2674 index,
2675 kind,
2676 pk,
2677 status,
2678 error: None,
2679 searchable,
2680 }
2681}
2682
2683#[derive(Debug, Clone, PartialEq, Eq)]
2684enum IngestError {
2685 ImmutableField {
2690 field: &'static str,
2691 session_id: String,
2692 stored: String,
2693 attempted: String,
2694 },
2695}
2696
2697impl std::fmt::Display for IngestError {
2698 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2699 match self {
2700 Self::ImmutableField {
2701 field,
2702 session_id,
2703 stored,
2704 attempted,
2705 } => write!(
2706 formatter,
2707 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2708 ),
2709 }
2710 }
2711}
2712
2713impl std::error::Error for IngestError {}
2714
2715fn ensure_immutable_match(
2719 existing: &Session,
2720 incoming: &Session,
2721) -> std::result::Result<(), IngestError> {
2722 if existing.source_agent != incoming.source_agent {
2723 return Err(IngestError::ImmutableField {
2724 field: "source_agent",
2725 session_id: incoming.id.clone(),
2726 stored: existing.source_agent.clone(),
2727 attempted: incoming.source_agent.clone(),
2728 });
2729 }
2730 if existing.project != incoming.project {
2731 return Err(IngestError::ImmutableField {
2732 field: "project",
2733 session_id: incoming.id.clone(),
2734 stored: (*existing.project).clone(),
2735 attempted: (*incoming.project).clone(),
2736 });
2737 }
2738 Ok(())
2739}
2740
2741pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2742 use crate::wire::Provenance;
2743 let mut chunks: Vec<String> = Vec::new();
2744 for part in parts {
2745 if part.provenance != Provenance::Conversational {
2748 continue;
2749 }
2750 match (message.role(), &part.kind) {
2751 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2752 if let Some(text) = text {
2753 chunks.push(text.to_string());
2754 }
2755 }
2756 (
2757 Role::User | Role::Assistant,
2758 PartKind::File {
2759 media_type,
2760 file_name,
2761 data,
2762 },
2763 ) => {
2764 if let Some(file_name) = file_name {
2765 chunks.push(file_name.clone());
2766 }
2767 if let Some(media_type) = media_type {
2768 chunks.push(media_type.clone());
2769 }
2770 if let FileData::Url(uri) = data {
2771 chunks.push(uri.clone());
2772 }
2773 }
2774 (
2775 Role::System | Role::Tool,
2776 PartKind::Text { .. }
2777 | PartKind::Reasoning { .. }
2778 | PartKind::File { .. }
2779 | PartKind::ToolCall { .. }
2780 | PartKind::ToolResult { .. }
2781 | PartKind::ToolApprovalRequest { .. }
2782 | PartKind::ToolApprovalResponse { .. },
2783 )
2784 | (
2785 Role::User | Role::Assistant,
2786 PartKind::Reasoning { .. }
2787 | PartKind::ToolCall { .. }
2788 | PartKind::ToolResult { .. }
2789 | PartKind::ToolApprovalRequest { .. }
2790 | PartKind::ToolApprovalResponse { .. },
2791 ) => {}
2792 }
2793 }
2794
2795 let text = chunks
2796 .into_iter()
2797 .filter(|chunk| !chunk.trim().is_empty())
2798 .collect::<Vec<_>>()
2799 .join("\n");
2800 if text.is_empty() { None } else { Some(text) }
2801}
2802
2803#[derive(Debug, Clone, PartialEq, Eq)]
2805pub struct SearchText(String);
2806
2807impl SearchText {
2808 pub fn as_str(&self) -> &str {
2809 &self.0
2810 }
2811
2812 pub fn into_inner(self) -> String {
2813 self.0
2814 }
2815}
2816
2817impl AsRef<str> for SearchText {
2818 fn as_ref(&self) -> &str {
2819 &self.0
2820 }
2821}
2822
2823#[derive(Debug, Clone, PartialEq)]
2824pub struct MessageWithParts {
2825 pub message: Message,
2826 pub parts: Vec<Part>,
2827}
2828
2829#[derive(Debug, Clone, PartialEq)]
2830pub struct SessionWithMessages {
2831 pub session: Session,
2832 pub messages: Vec<MessageWithParts>,
2833}
2834
2835#[derive(Debug, Clone)]
2836pub struct SessionViewParams<'a> {
2837 pub mode: ResponseMode,
2838 pub after_id: Option<&'a str>,
2839 pub limit: usize,
2840 pub budget_bytes: usize,
2841 pub session_from: SessionFrom,
2842}
2843
2844#[derive(Debug, Clone)]
2845pub struct MessageViewParams<'a> {
2846 pub context_depth: usize,
2847 pub after_id: Option<&'a str>,
2848 pub limit: usize,
2849 pub budget_bytes: usize,
2850}
2851
2852#[derive(Debug, Clone, PartialEq)]
2858pub enum GetLookup<T> {
2859 NotFound,
2860 UnknownAfterId,
2861 Found(T),
2862}
2863
2864#[derive(Debug, Clone, PartialEq)]
2868pub struct SessionPage {
2869 pub session: Session,
2870 pub messages: Vec<RetrievedMessage>,
2871 pub messages_remaining: usize,
2872}
2873
2874#[derive(Debug, Clone, PartialEq)]
2878pub struct MessagePage {
2879 pub session: Session,
2880 pub target: RetrievedMessage,
2881 pub target_parts: Vec<Part>,
2882 pub target_parts_remaining: usize,
2883 pub siblings: Vec<RetrievedMessage>,
2884}
2885
2886#[derive(Debug, Clone, PartialEq)]
2887pub struct RetrievedMessage {
2888 pub id: String,
2889 pub role: Role,
2890 pub timestamp: DateTime<Utc>,
2891 pub text: Option<String>,
2892 pub content: Option<String>,
2893 pub parts: Vec<Part>,
2894}
2895
2896#[derive(Debug, Clone)]
2897struct ScanRow {
2898 id: String,
2899 role: Role,
2900 timestamp: DateTime<Utc>,
2901 text: Option<String>,
2902 content: Option<String>,
2903}
2904
2905#[derive(Debug, Clone)]
2908pub struct ConversationalRow {
2909 pub session_id: String,
2910 pub message_id: String,
2911 pub role: Role,
2912 pub timestamp: DateTime<Utc>,
2913 pub text: SearchText,
2914}
2915
2916fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2921 let capped = items.len().min(limit.clamp(1, 1000));
2922 let mut acc = 0usize;
2923 let mut emitted = 0usize;
2924 for item in &items[..capped] {
2925 let next = acc.saturating_add(size(item));
2926 if emitted > 0 && next > budget_bytes {
2927 break;
2928 }
2929 acc = next;
2930 emitted += 1;
2931 }
2932 emitted
2933}
2934
2935fn role_from_str(value: &str) -> Result<Role> {
2936 match value {
2937 "system" => Ok(Role::System),
2938 "user" => Ok(Role::User),
2939 "assistant" => Ok(Role::Assistant),
2940 "tool" => Ok(Role::Tool),
2941 other => anyhow::bail!("unknown message role {other}"),
2942 }
2943}
2944
2945const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2953 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2954 (
2955 "session_id",
2956 BuiltinIndexType::BTree,
2957 "messages_session_id_btree",
2958 ),
2959 (
2960 "timestamp",
2961 BuiltinIndexType::BTree,
2962 "messages_timestamp_btree",
2963 ),
2964 (
2965 "source_agent",
2966 BuiltinIndexType::Bitmap,
2967 "messages_source_agent_bitmap",
2968 ),
2969 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2970];
2971
2972const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2975 (
2976 "session_id",
2977 BuiltinIndexType::BTree,
2978 "parts_session_id_btree",
2979 ),
2980 (
2981 "message_id",
2982 BuiltinIndexType::BTree,
2983 "parts_message_id_btree",
2984 ),
2985];
2986
2987const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2990 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2991
2992fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2993 Predicate::In(
2994 column,
2995 values.iter().cloned().map(ScalarValue::String).collect(),
2996 )
2997}
2998
2999fn embedded_scope(filter: &Predicate) -> Predicate {
3004 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3005}
3006
3007fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3008 let inserted = usize::try_from(inserted_rows)
3009 .unwrap_or(usize::MAX)
3010 .min(total);
3011 let mut statuses = Vec::with_capacity(total);
3012 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3013 statuses.extend(std::iter::repeat_n(
3014 UpsertStatus::Matched,
3015 total.saturating_sub(inserted),
3016 ));
3017 statuses
3018}
3019
3020pub(crate) const SESSIONS: &str = "sessions";
3024pub(crate) const MESSAGES: &str = "messages";
3025pub(crate) const PARTS: &str = "parts";
3026
3027pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3030
3031pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3034
3035const IVF_PQ_NUM_BITS: u8 = 8;
3041const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3042const IVF_PQ_MAX_ITERS: usize = 15;
3043
3044const FTS_NGRAM_MIN: u32 = 3;
3048const FTS_NGRAM_MAX: u32 = 5;
3049
3050pub fn pond_index_intents() -> IndexIntents {
3053 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3054}
3055
3056pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3060 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3061 messages.push(IndexIntent {
3062 name: MESSAGES_FTS_INDEX,
3063 column: "search_text",
3064 trigger: IndexTrigger::OnAnyRows,
3065 params: IndexParamsKind::InvertedFtsNgram {
3066 min: FTS_NGRAM_MIN,
3067 max: FTS_NGRAM_MAX,
3068 },
3069 });
3070 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3071 messages.push(IndexIntent {
3072 name,
3073 column,
3074 trigger: IndexTrigger::OnAnyRows,
3075 params: IndexParamsKind::Scalar(kind.clone()),
3076 });
3077 }
3078 messages.push(IndexIntent {
3079 name: MESSAGES_VECTOR_INDEX,
3080 column: "vector",
3081 trigger: IndexTrigger::OnNonNullCount {
3082 column: "vector",
3083 threshold: vector_threshold,
3084 },
3085 params: IndexParamsKind::IvfPqCosine {
3086 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3087 num_bits: IVF_PQ_NUM_BITS,
3088 max_iters: IVF_PQ_MAX_ITERS,
3089 },
3090 });
3091 let parts = PARTS_SCALAR_INDICES
3092 .iter()
3093 .map(|(column, kind, name)| IndexIntent {
3094 name,
3095 column,
3096 trigger: IndexTrigger::OnAnyRows,
3097 params: IndexParamsKind::Scalar(kind.clone()),
3098 })
3099 .collect();
3100 let sessions = SESSIONS_SCALAR_INDICES
3101 .iter()
3102 .map(|(column, kind, name)| IndexIntent {
3103 name,
3104 column,
3105 trigger: IndexTrigger::OnAnyRows,
3106 params: IndexParamsKind::Scalar(kind.clone()),
3107 })
3108 .collect();
3109 IndexIntents {
3110 sessions,
3111 messages,
3112 parts,
3113 }
3114}
3115
3116pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3120
3121static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3127
3128pub fn embedding_dim() -> usize {
3131 EMBEDDING_DIM_RUNTIME
3132 .get()
3133 .copied()
3134 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3135}
3136
3137pub fn init_embedding_dim(dim: usize) {
3139 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3140}
3141
3142pub(crate) fn write_params_for_create() -> WriteParams {
3149 WriteParams {
3150 data_storage_version: Some(LanceFileVersion::V2_1),
3151 enable_v2_manifest_paths: true,
3152 enable_stable_row_ids: true,
3153 auto_cleanup: Some(AutoCleanupParams {
3154 interval: 20,
3155 older_than: chrono::TimeDelta::days(1),
3156 }),
3157 skip_auto_cleanup: true,
3158 ..WriteParams::default()
3159 }
3160}
3161
3162fn export_schema(table: Table) -> Arc<Schema> {
3163 match table {
3164 Table::Sessions => session_schema(),
3165 Table::Messages => message_schema(),
3166 Table::Parts => part_schema(),
3167 }
3168}
3169
3170fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3171 let expected = export_schema(table);
3172 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3173 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3174 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3175 if actual_names != expected_names {
3176 anyhow::bail!(
3177 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3178 table.as_str(),
3179 );
3180 }
3181 Ok(())
3182}
3183
3184async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3185 let source_uri = source
3186 .to_str()
3187 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3188 let dataset = Dataset::open(source_uri)
3189 .await
3190 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3191 ensure_schema_matches_archive(&dataset, table)?;
3192 Ok(dataset)
3193}
3194
3195pub(crate) fn session_schema() -> Arc<Schema> {
3196 Arc::new(Schema::new(vec![
3197 primary_field("id", DataType::Utf8, false),
3198 Field::new("parent_session_id", DataType::Utf8, true),
3199 Field::new("parent_message_id", DataType::Utf8, true),
3200 Field::new("source_agent", DataType::Utf8, false),
3201 Field::new(
3202 "created_at",
3203 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3204 false,
3205 ),
3206 Field::new("project", DataType::Utf8, false),
3207 json_field("options", false),
3208 ]))
3209}
3210
3211pub(crate) fn message_schema() -> Arc<Schema> {
3212 Arc::new(Schema::new(vec![
3213 primary_field("session_id", DataType::Utf8, false),
3214 primary_field("id", DataType::Utf8, false),
3215 Field::new(
3216 "timestamp",
3217 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3218 false,
3219 ),
3220 Field::new("role", DataType::Utf8, false),
3221 Field::new("source_agent", DataType::Utf8, false),
3222 Field::new("project", DataType::Utf8, false),
3223 Field::new("content", DataType::Utf8, true),
3224 Field::new("search_text", DataType::Utf8, true),
3225 Field::new("vector", embedding_vector_type(), true),
3228 Field::new("embedding_model", DataType::Utf8, true),
3229 json_field("options", false),
3230 ]))
3231}
3232
3233pub(crate) fn part_schema() -> Arc<Schema> {
3234 Arc::new(Schema::new(vec![
3235 primary_field("session_id", DataType::Utf8, false),
3236 primary_field("message_id", DataType::Utf8, false),
3237 primary_field("id", DataType::Utf8, false),
3238 Field::new("ordinal", DataType::Int32, false),
3239 Field::new("type", DataType::Utf8, false),
3240 Field::new("provenance", DataType::Utf8, false),
3243 json_field("variant_data", false),
3244 legacy_blob_field("data", true),
3245 json_field("options", false),
3246 ]))
3247}
3248
3249pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3250 let arrays = schema
3251 .fields()
3252 .iter()
3253 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3254 .collect();
3255 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3256}
3257
3258pub(crate) fn empty_reader(
3259 schema: Arc<Schema>,
3260) -> Result<
3261 RecordBatchIterator<
3262 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3263 >,
3264> {
3265 let batch = empty_batch(schema.clone())?;
3266 Ok(RecordBatchIterator::new(
3267 vec![Ok(batch)].into_iter(),
3268 schema,
3269 ))
3270}
3271
3272pub(crate) struct MessageBatchRow<'a> {
3273 pub message: &'a Message,
3274 pub source_agent: &'a str,
3275 pub project: &'a str,
3276 pub search_text: Option<&'a str>,
3277}
3278
3279fn embedding_vector_type() -> DataType {
3285 DataType::FixedSizeList(
3286 Arc::new(Field::new("item", DataType::Float16, true)),
3287 embedding_dim() as i32,
3288 )
3289}
3290
3291fn embedding_update_schema() -> Arc<Schema> {
3295 Arc::new(Schema::new(vec![
3296 primary_field("session_id", DataType::Utf8, false),
3297 primary_field("id", DataType::Utf8, false),
3298 Field::new("vector", embedding_vector_type(), true),
3299 Field::new("embedding_model", DataType::Utf8, true),
3300 ]))
3301}
3302
3303pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3306 let dim = embedding_dim();
3307 let mut flat = Vec::with_capacity(rows.len() * dim);
3308 for row in rows {
3309 if row.vector.len() != dim {
3310 anyhow::bail!(
3311 "embedding for message {} has dim {}, expected {dim}",
3312 row.id,
3313 row.vector.len(),
3314 );
3315 }
3316 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3317 }
3318 let values = Float16Array::from(flat);
3319 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3320 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3321 .context("failed to build embedding vector column")?;
3322
3323 RecordBatch::try_new(
3324 embedding_update_schema(),
3325 vec![
3326 Arc::new(StringArray::from(
3327 rows.iter()
3328 .map(|row| row.session_id.as_str())
3329 .collect::<Vec<_>>(),
3330 )),
3331 Arc::new(StringArray::from(
3332 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3333 )),
3334 Arc::new(vectors),
3335 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3336 ],
3337 )
3338 .context("failed to build embedding update batch")
3339}
3340
3341const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3346
3347fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3352 let mut chunks = Vec::new();
3353 let mut start = 0usize;
3354 let mut running = 0usize;
3355 for (index, &row) in cells.iter().enumerate() {
3356 if running + row > COLUMN_BYTE_BUDGET && index > start {
3357 chunks.push(start..index);
3358 start = index;
3359 running = 0;
3360 }
3361 running += row;
3362 }
3363 if start < cells.len() {
3364 chunks.push(start..cells.len());
3365 }
3366 chunks
3367}
3368
3369fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3370 if bytes >= COLUMN_BYTE_BUDGET {
3371 anyhow::bail!(
3372 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3373 overflow Arrow's i32 offset buffer"
3374 );
3375 }
3376 Ok(())
3377}
3378
3379async fn merge_insert_chunks(
3380 handle: &Handle,
3381 table: Table,
3382 batches: Vec<RecordBatch>,
3383) -> Result<u64> {
3384 let mut inserted = 0u64;
3385 for batch in batches {
3386 let rows = batch.num_rows();
3387 inserted += handle.merge_insert(table, batch, rows).await?;
3388 }
3389 Ok(inserted)
3390}
3391
3392pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3393 let options = sessions
3394 .iter()
3395 .map(|session| json_bytes(&session.options))
3396 .collect::<Result<Vec<_>>>()?;
3397 let mut cells = Vec::with_capacity(sessions.len());
3398 for (session, encoded) in sessions.iter().zip(&options) {
3399 let columns = [
3400 session.id.len(),
3401 session.parent_session_id.as_deref().map_or(0, str::len),
3402 session.parent_message_id.as_deref().map_or(0, str::len),
3403 session.source_agent.len(),
3404 session.project.as_str().len(),
3405 encoded.len(),
3406 ];
3407 for bytes in columns {
3408 guard_cell("sessions", &session.id, bytes)?;
3409 }
3410 cells.push(columns.iter().sum());
3411 }
3412 chunk_ranges(&cells)
3413 .into_iter()
3414 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3415 .collect()
3416}
3417
3418fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3419 let schema = session_schema();
3420 RecordBatch::try_new(
3421 schema.clone(),
3422 vec![
3423 Arc::new(StringArray::from(
3424 sessions
3425 .iter()
3426 .map(|session| session.id.as_str())
3427 .collect::<Vec<_>>(),
3428 )),
3429 Arc::new(StringArray::from(
3430 sessions
3431 .iter()
3432 .map(|session| session.parent_session_id.as_deref())
3433 .collect::<Vec<_>>(),
3434 )),
3435 Arc::new(StringArray::from(
3436 sessions
3437 .iter()
3438 .map(|session| session.parent_message_id.as_deref())
3439 .collect::<Vec<_>>(),
3440 )),
3441 Arc::new(StringArray::from(
3442 sessions
3443 .iter()
3444 .map(|session| session.source_agent.as_str())
3445 .collect::<Vec<_>>(),
3446 )),
3447 Arc::new(
3448 TimestampMicrosecondArray::from(
3449 sessions
3450 .iter()
3451 .map(|session| micros(session.created_at))
3452 .collect::<Vec<_>>(),
3453 )
3454 .with_timezone("UTC"),
3455 ),
3456 Arc::new(StringArray::from(
3457 sessions
3458 .iter()
3459 .map(|session| session.project.as_str())
3460 .collect::<Vec<_>>(),
3461 )),
3462 Arc::new(LargeBinaryArray::from_iter_values(
3463 options.iter().map(Vec::as_slice),
3464 )),
3465 ],
3466 )
3467 .context("failed to build session batch")
3468}
3469
3470pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3471 let options = rows
3472 .iter()
3473 .map(|row| json_bytes(row.message.options()))
3474 .collect::<Result<Vec<_>>>()?;
3475 let mut cells = Vec::with_capacity(rows.len());
3476 for (row, encoded) in rows.iter().zip(&options) {
3477 let columns = [
3478 row.message.session_id().len(),
3479 row.message.id().len(),
3480 row.message.role().as_str().len(),
3481 row.source_agent.len(),
3482 row.project.len(),
3483 row.message.system_content().map_or(0, str::len),
3484 row.search_text.map_or(0, str::len),
3485 encoded.len(),
3486 ];
3487 for bytes in columns {
3488 guard_cell("messages", row.message.id(), bytes)?;
3489 }
3490 cells.push(columns.iter().sum());
3491 }
3492 chunk_ranges(&cells)
3493 .into_iter()
3494 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3495 .collect()
3496}
3497
3498fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3499 let schema = message_schema();
3500 RecordBatch::try_new(
3501 schema.clone(),
3502 vec![
3503 Arc::new(StringArray::from(
3504 rows.iter()
3505 .map(|row| row.message.session_id())
3506 .collect::<Vec<_>>(),
3507 )),
3508 Arc::new(StringArray::from(
3509 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3510 )),
3511 Arc::new(
3512 TimestampMicrosecondArray::from(
3513 rows.iter()
3514 .map(|row| micros(row.message.timestamp()))
3515 .collect::<Vec<_>>(),
3516 )
3517 .with_timezone("UTC"),
3518 ),
3519 Arc::new(StringArray::from(
3520 rows.iter()
3521 .map(|row| row.message.role().as_str())
3522 .collect::<Vec<_>>(),
3523 )),
3524 Arc::new(StringArray::from(
3525 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3526 )),
3527 Arc::new(StringArray::from(
3528 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3529 )),
3530 Arc::new(StringArray::from(
3531 rows.iter()
3532 .map(|row| row.message.system_content())
3533 .collect::<Vec<_>>(),
3534 )),
3535 Arc::new(StringArray::from(
3536 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3537 )),
3538 new_null_array(&embedding_vector_type(), rows.len()),
3542 new_null_array(&DataType::Utf8, rows.len()),
3543 Arc::new(LargeBinaryArray::from_iter_values(
3544 options.iter().map(Vec::as_slice),
3545 )),
3546 ],
3547 )
3548 .context("failed to build message batch")
3549}
3550
3551pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3552 let variant_data = parts
3553 .iter()
3554 .map(|part| part_variant_json(&part.kind))
3555 .collect::<Result<Vec<_>>>()?;
3556 let options = parts
3557 .iter()
3558 .map(|part| json_bytes(&part.options))
3559 .collect::<Result<Vec<_>>>()?;
3560 let mut cells = Vec::with_capacity(parts.len());
3561 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3564 let columns = [
3565 part.session_id.len(),
3566 part.message_id.len(),
3567 part.id.len(),
3568 part.kind.type_name().len(),
3569 part.provenance.as_str().len(),
3570 variant.len(),
3571 encoded.len(),
3572 ];
3573 for bytes in columns {
3574 guard_cell("parts", &part.id, bytes)?;
3575 }
3576 cells.push(columns.iter().sum());
3577 }
3578 chunk_ranges(&cells)
3579 .into_iter()
3580 .map(|range| {
3581 parts_chunk(
3582 &parts[range.clone()],
3583 &variant_data[range.clone()],
3584 &options[range],
3585 )
3586 })
3587 .collect()
3588}
3589
3590fn parts_chunk(
3591 parts: &[Part],
3592 variant_data: &[Vec<u8>],
3593 options: &[Vec<u8>],
3594) -> Result<RecordBatch> {
3595 let schema = part_schema();
3596 let blob_payloads: Vec<Option<&[u8]>> = parts
3600 .iter()
3601 .map(|part| match &part.kind {
3602 PartKind::File { data, .. } => Some(match data {
3603 FileData::String(value) => value.as_bytes(),
3604 FileData::Bytes(value) => value.as_slice(),
3605 FileData::Url(value) => value.as_bytes(),
3606 }),
3607 PartKind::Text { .. }
3608 | PartKind::Reasoning { .. }
3609 | PartKind::ToolCall { .. }
3610 | PartKind::ToolResult { .. }
3611 | PartKind::ToolApprovalRequest { .. }
3612 | PartKind::ToolApprovalResponse { .. } => None,
3613 })
3614 .collect();
3615 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3616
3617 RecordBatch::try_new(
3618 schema.clone(),
3619 vec![
3620 Arc::new(StringArray::from(
3621 parts
3622 .iter()
3623 .map(|part| part.session_id.as_str())
3624 .collect::<Vec<_>>(),
3625 )),
3626 Arc::new(StringArray::from(
3627 parts
3628 .iter()
3629 .map(|part| part.message_id.as_str())
3630 .collect::<Vec<_>>(),
3631 )),
3632 Arc::new(StringArray::from(
3633 parts
3634 .iter()
3635 .map(|part| part.id.as_str())
3636 .collect::<Vec<_>>(),
3637 )),
3638 Arc::new(Int32Array::from(
3639 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3640 )),
3641 Arc::new(StringArray::from(
3642 parts
3643 .iter()
3644 .map(|part| part.kind.type_name())
3645 .collect::<Vec<_>>(),
3646 )),
3647 Arc::new(StringArray::from(
3648 parts
3649 .iter()
3650 .map(|part| part.provenance.as_str())
3651 .collect::<Vec<_>>(),
3652 )),
3653 Arc::new(LargeBinaryArray::from_iter_values(
3654 variant_data.iter().map(Vec::as_slice),
3655 )),
3656 Arc::new(blob_array),
3657 Arc::new(LargeBinaryArray::from_iter_values(
3658 options.iter().map(Vec::as_slice),
3659 )),
3660 ],
3661 )
3662 .context("failed to build parts batch")
3663}
3664
3665pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3666 Ok(Session {
3667 id: string(batch, "id", row)?.context("session id is null")?,
3668 parent_session_id: string(batch, "parent_session_id", row)?,
3669 parent_message_id: string(batch, "parent_message_id", row)?,
3670 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3671 created_at: datetime(batch, "created_at", row)?,
3672 project: crate::adapter::Extracted::from_stored(
3673 string(batch, "project", row)?.context("project is null")?,
3674 ),
3675 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3676 })
3677}
3678
3679pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3680 let id = string(batch, "id", row)?.context("message id is null")?;
3681 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3682 let timestamp = datetime(batch, "timestamp", row)?;
3683 let options =
3684 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3685
3686 match string(batch, "role", row)?
3687 .context("message role is null")?
3688 .as_str()
3689 {
3690 "system" => Ok(Message::System {
3691 id,
3692 session_id,
3693 timestamp,
3694 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3701 options,
3702 }),
3703 "user" => Ok(Message::User {
3704 id,
3705 session_id,
3706 timestamp,
3707 options,
3708 }),
3709 "assistant" => Ok(Message::Assistant {
3710 id,
3711 session_id,
3712 timestamp,
3713 options,
3714 }),
3715 "tool" => Ok(Message::Tool {
3716 id,
3717 session_id,
3718 timestamp,
3719 options,
3720 }),
3721 other => anyhow::bail!("unknown message role {other}"),
3722 }
3723}
3724
3725pub(crate) fn part_from_batch(
3726 batch: &RecordBatch,
3727 row: usize,
3728 file_data: Option<FileData>,
3729) -> Result<Part> {
3730 let type_name = string(batch, "type", row)?.context("part type is null")?;
3731 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3732 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3733 Ok(Part {
3734 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3735 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3736 id: string(batch, "id", row)?.context("part id is null")?,
3737 ordinal: int32(batch, "ordinal", row)?,
3738 provenance: provenance_from_str(&provenance)?,
3739 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3740 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3741 })
3742}
3743
3744fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3745 match value {
3746 "conversational" => Ok(crate::wire::Provenance::Conversational),
3747 "injected" => Ok(crate::wire::Provenance::Injected),
3748 other => anyhow::bail!("unknown part provenance {other}"),
3749 }
3750}
3751
3752fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3753 let kind = file_data_kind(variant_data)?;
3754 match kind.as_str() {
3755 "string" => {
3756 let text = std::str::from_utf8(bytes)
3757 .context("file string payload is not UTF-8")?
3758 .to_owned();
3759 Ok(FileData::String(text))
3760 }
3761 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3762 "url" => Ok(FileData::Url(
3763 std::str::from_utf8(bytes)
3764 .context("file URL payload is not UTF-8")?
3765 .to_owned(),
3766 )),
3767 other => anyhow::bail!("unknown file data_kind {other}"),
3768 }
3769}
3770
3771fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3772 let value = json_parse::<Value>(variant_data)?;
3773 value
3774 .get("data_kind")
3775 .and_then(Value::as_str)
3776 .map(str::to_owned)
3777 .context("file part variant_data missing data_kind")
3778}
3779
3780fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3781 batch
3782 .column_by_name(name)
3783 .with_context(|| format!("missing column {name}"))?
3784 .as_any()
3785 .downcast_ref::<UInt64Array>()
3786 .with_context(|| format!("column {name} is not UInt64"))
3787}
3788
3789pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3790 let array = batch
3791 .column_by_name(name)
3792 .with_context(|| format!("missing column {name}"))?
3793 .as_any()
3794 .downcast_ref::<StringArray>()
3795 .with_context(|| format!("column {name} is not Utf8"))?;
3796 if array.is_null(row) {
3797 Ok(None)
3798 } else {
3799 Ok(Some(array.value(row).to_owned()))
3800 }
3801}
3802
3803fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3804 let column = batch
3808 .column_by_name(name)
3809 .with_context(|| format!("missing column {name}"))?;
3810 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3811 return if array.is_null(row) {
3812 Ok(None)
3813 } else {
3814 Ok(Some(
3815 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3816 ))
3817 };
3818 }
3819 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3820 return if array.is_null(row) {
3821 Ok(None)
3822 } else {
3823 Ok(Some(array.value(row).as_bytes().to_vec()))
3824 };
3825 }
3826 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3827 return if array.is_null(row) {
3828 Ok(None)
3829 } else {
3830 Ok(Some(array.value(row).as_bytes().to_vec()))
3831 };
3832 }
3833 anyhow::bail!("column {name} is not a JSON-compatible array")
3834}
3835
3836fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3837 let array = batch
3838 .column_by_name(name)
3839 .with_context(|| format!("missing column {name}"))?
3840 .as_any()
3841 .downcast_ref::<Int32Array>()
3842 .with_context(|| format!("column {name} is not Int32"))?;
3843 Ok(array.value(row))
3844}
3845
3846pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3847 let array = batch
3848 .column_by_name(name)
3849 .with_context(|| format!("missing column {name}"))?
3850 .as_any()
3851 .downcast_ref::<Float32Array>()
3852 .with_context(|| format!("column {name} is not Float32"))?;
3853 Ok(array.value(row))
3854}
3855
3856pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3857 let array = batch
3858 .column_by_name(name)
3859 .with_context(|| format!("missing column {name}"))?
3860 .as_any()
3861 .downcast_ref::<TimestampMicrosecondArray>()
3862 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3863 Utc.timestamp_micros(array.value(row))
3864 .single()
3865 .context("timestamp is out of range")
3866}
3867
3868fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3869 Field::new(name, data_type, nullable).with_metadata(
3870 [(
3871 "lance-schema:unenforced-primary-key".to_owned(),
3872 "true".to_owned(),
3873 )]
3874 .into(),
3875 )
3876}
3877
3878fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3888 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3889 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3890 .into_iter()
3891 .collect(),
3892 )
3893}
3894
3895fn json_field(name: &str, nullable: bool) -> Field {
3896 lance_arrow::json::json_field(name, nullable)
3897}
3898
3899fn micros(timestamp: DateTime<Utc>) -> i64 {
3900 timestamp.timestamp_micros()
3901}
3902
3903fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3904 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3912 lance_arrow::json::encode_json(&text)
3913 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3914}
3915
3916fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3917 serde_json::from_slice(value).context("failed to parse JSON field")
3918}
3919
3920fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3921 if let PartKind::File {
3922 media_type,
3923 file_name,
3924 data,
3925 } = kind
3926 {
3927 let data_kind = match data {
3928 FileData::String(_) => "string",
3929 FileData::Bytes(_) => "bytes",
3930 FileData::Url(_) => "url",
3931 };
3932 return json_bytes(&serde_json::json!({
3933 "media_type": media_type,
3934 "file_name": file_name,
3935 "data_kind": data_kind,
3936 }));
3937 }
3938 let value = serde_json::to_value(kind)?;
3939 let mut object = value
3940 .as_object()
3941 .cloned()
3942 .context("part variant did not serialize to an object")?;
3943 object.remove("type");
3944 json_bytes(&object)
3945}
3946
3947fn part_kind_from_json(
3948 type_name: &str,
3949 variant_data: &[u8],
3950 file_data: Option<FileData>,
3951) -> Result<PartKind> {
3952 let mut value = json_parse::<Value>(variant_data)?;
3953 let object = value
3954 .as_object_mut()
3955 .context("part variant data is not an object")?;
3956 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3957 if let Some(data) = file_data {
3958 object.remove("data_kind");
3959 object.insert("data".to_owned(), serde_json::to_value(data)?);
3960 }
3961 serde_json::from_value(value).context("failed to parse part kind")
3962}
3963
3964#[cfg(test)]
3965mod tests {
3966 #![allow(clippy::expect_used, clippy::unwrap_used)]
3967
3968 use super::*;
3969 use crate::{
3970 adapter::Extracted,
3971 handlers::ingest_events,
3972 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3973 };
3974 use chrono::Utc;
3975 use serde_json::json;
3976 use tempfile::TempDir;
3977
3978 fn synthetic_session(id: &str) -> Session {
3979 Session {
3980 id: id.to_owned(),
3981 parent_session_id: None,
3982 parent_message_id: None,
3983 source_agent: "claude-code".to_owned(),
3984 created_at: Utc::now(),
3985 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3986 options: ProviderOptions::new(),
3987 }
3988 }
3989
3990 #[test]
3991 fn search_text_excludes_injected_parts() {
3992 use crate::wire::Provenance;
3993 let message = Message::User {
3994 id: "m1".to_owned(),
3995 session_id: "s1".to_owned(),
3996 timestamp: Utc::now(),
3997 options: ProviderOptions::new(),
3998 };
3999 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4000 session_id: "s1".to_owned(),
4001 id: id.to_owned(),
4002 message_id: "m1".to_owned(),
4003 ordinal: 0,
4004 provenance,
4005 options: ProviderOptions::new(),
4006 kind: PartKind::Text {
4007 text: Some(Extracted::from_test_value(text.to_owned())),
4008 },
4009 };
4010
4011 let conversational = search_text(
4014 &message,
4015 &[text_part(
4016 "p1",
4017 "real human prompt",
4018 Provenance::Conversational,
4019 )],
4020 );
4021 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4022
4023 let injected = search_text(
4024 &message,
4025 &[text_part(
4026 "p2",
4027 "<task-notification>...</task-notification>",
4028 Provenance::Injected,
4029 )],
4030 );
4031 assert!(
4032 injected.is_none(),
4033 "a message whose only part is injected has null search_text"
4034 );
4035 }
4036
4037 #[test]
4038 fn chunk_ranges_splits_on_byte_budget() {
4039 assert!(chunk_ranges(&[]).is_empty());
4040 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4041
4042 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4043 assert_eq!(
4044 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4045 vec![0..1, 1..2, 2..3],
4046 );
4047
4048 assert_eq!(
4050 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4051 vec![0..1, 1..2, 2..3],
4052 );
4053 }
4054
4055 #[tokio::test]
4056 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4057 let temp = TempDir::new()?;
4062 let store = Store::open_local(temp.path()).await?;
4063 let session = synthetic_session("ordering");
4064 let orphan_part = Part {
4065 session_id: session.id.clone(),
4066 id: "orphan-part".to_owned(),
4067 message_id: "missing-message".to_owned(),
4068 ordinal: 0,
4069 provenance: crate::wire::Provenance::Conversational,
4070 options: ProviderOptions::new(),
4071 kind: PartKind::Text {
4072 text: Some(Extracted::from_test_value("orphan".to_owned())),
4073 },
4074 };
4075 let valid_message = Message::User {
4076 id: "valid-message".to_owned(),
4077 session_id: session.id.clone(),
4078 timestamp: Utc::now(),
4079 options: ProviderOptions::new(),
4080 };
4081 let valid_part = Part {
4082 session_id: session.id.clone(),
4083 id: "valid-part".to_owned(),
4084 message_id: valid_message.id().to_owned(),
4085 ordinal: 0,
4086 provenance: crate::wire::Provenance::Conversational,
4087 options: ProviderOptions::new(),
4088 kind: PartKind::Text {
4089 text: Some(Extracted::from_test_value("kept".to_owned())),
4090 },
4091 };
4092
4093 let mut validator = IngestValidator::default();
4094 validator
4095 .push(&store, 0, IngestEvent::Session(session.clone()))
4096 .await?;
4097 let part_outcomes = validator
4098 .push(&store, 1, IngestEvent::Part(orphan_part))
4099 .await?;
4100 assert_eq!(part_outcomes.len(), 1);
4101 assert_eq!(part_outcomes[0].kind, "part");
4102 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4103 assert!(
4104 part_outcomes[0]
4105 .error
4106 .as_ref()
4107 .map(|e| e.message.contains("part event appeared before a message"))
4108 .unwrap_or(false),
4109 "error message must explain the ordering violation: {part_outcomes:?}"
4110 );
4111 validator
4112 .push(&store, 2, IngestEvent::Message(valid_message))
4113 .await?;
4114 validator
4115 .push(&store, 3, IngestEvent::Part(valid_part))
4116 .await?;
4117 validator.finish(&store).await?;
4118
4119 let (sessions, messages, parts) = store.row_counts().await?;
4120 assert_eq!(sessions, 1, "session committed despite the orphan part");
4121 assert_eq!(messages, 1, "valid message committed");
4122 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4123
4124 Ok(())
4125 }
4126
4127 #[tokio::test]
4128 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4129 let temp = TempDir::new()?;
4133 let store = Store::open_local(temp.path()).await?;
4134 let session = synthetic_session("duplicate-message");
4135 let first = Message::User {
4136 id: "message-1".to_owned(),
4137 session_id: session.id.clone(),
4138 timestamp: Utc::now(),
4139 options: ProviderOptions::new(),
4140 };
4141 let second = Message::Assistant {
4142 id: "message-1".to_owned(),
4143 session_id: session.id.clone(),
4144 timestamp: Utc::now(),
4145 options: ProviderOptions::new(),
4146 };
4147
4148 let mut validator = IngestValidator::default();
4149 validator
4150 .push(&store, 0, IngestEvent::Session(session.clone()))
4151 .await?;
4152 validator
4153 .push(&store, 1, IngestEvent::Message(first))
4154 .await?;
4155 let dup_outcomes = validator
4156 .push(&store, 2, IngestEvent::Message(second))
4157 .await?;
4158 assert_eq!(dup_outcomes.len(), 1);
4159 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4160 assert!(
4161 dup_outcomes[0]
4162 .error
4163 .as_ref()
4164 .map(|e| e.message.contains("duplicate message id message-1"))
4165 .unwrap_or(false),
4166 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4167 );
4168
4169 validator.finish(&store).await?;
4170 let (sessions, messages, _) = store.row_counts().await?;
4171 assert_eq!(sessions, 1, "session committed");
4172 assert_eq!(messages, 1, "only the first message committed");
4173
4174 Ok(())
4175 }
4176
4177 #[tokio::test(flavor = "multi_thread")]
4185 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4186 use crate::wire::{FileData, PartKind, Provenance};
4187 let temp = TempDir::new()?;
4188 let store = Store::open_local(temp.path()).await?;
4189
4190 let session = synthetic_session("compact-blob");
4191 store
4192 .upsert_sessions(std::slice::from_ref(&session))
4193 .await?;
4194
4195 let make_part = |idx: usize, kind: PartKind| Part {
4196 session_id: session.id.clone(),
4197 message_id: format!("msg-{idx}"),
4198 id: format!("part-{idx}"),
4199 ordinal: 0,
4200 provenance: Provenance::Conversational,
4201 options: ProviderOptions::new(),
4202 kind,
4203 };
4204
4205 let batch_a = vec![
4206 make_part(
4207 0,
4208 PartKind::File {
4209 media_type: Some("text/plain".to_owned()),
4210 file_name: Some("a.txt".to_owned()),
4211 data: FileData::Bytes(b"alpha".to_vec()),
4212 },
4213 ),
4214 make_part(
4215 1,
4216 PartKind::File {
4217 media_type: Some("text/plain".to_owned()),
4218 file_name: Some("b.txt".to_owned()),
4219 data: FileData::String("beta".to_owned()),
4220 },
4221 ),
4222 ];
4223 store.upsert_parts(&batch_a).await?;
4224
4225 let batch_b = vec![
4226 make_part(
4227 2,
4228 PartKind::File {
4229 media_type: Some("application/octet-stream".to_owned()),
4230 file_name: None,
4231 data: FileData::Url("https://example.com/file".to_owned()),
4232 },
4233 ),
4234 make_part(
4235 3,
4236 PartKind::File {
4237 media_type: Some("image/png".to_owned()),
4238 file_name: Some("c.png".to_owned()),
4239 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4240 },
4241 ),
4242 ];
4243 store.upsert_parts(&batch_b).await?;
4244
4245 store
4246 .optimize_indices(None, &MaintenancePolicy::always_compact())
4247 .await?
4248 .into_result()?;
4249
4250 Ok(())
4251 }
4252
4253 #[tokio::test]
4254 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4255 let temp = TempDir::new()?;
4256 let store = Store::open_local(temp.path()).await?;
4257 let session = synthetic_session("blob");
4258 let message = Message::User {
4259 id: "message-1".to_owned(),
4260 session_id: session.id.clone(),
4261 timestamp: Utc::now(),
4262 options: ProviderOptions::new(),
4263 };
4264 let part = Part {
4265 session_id: session.id.clone(),
4266 id: "part-1".to_owned(),
4267 message_id: message.id().to_owned(),
4268 ordinal: 0,
4269 provenance: crate::wire::Provenance::Conversational,
4270 options: ProviderOptions::new(),
4271 kind: PartKind::File {
4272 media_type: Some("text/plain".to_owned()),
4273 file_name: Some("payload.txt".to_owned()),
4274 data: FileData::Bytes(b"pond".to_vec()),
4275 },
4276 };
4277
4278 let mut validator = IngestValidator::default();
4279 validator
4280 .push(&store, 0, IngestEvent::Session(session.clone()))
4281 .await?;
4282 validator
4283 .push(&store, 1, IngestEvent::Message(message.clone()))
4284 .await?;
4285 validator
4286 .push(&store, 2, IngestEvent::Part(part.clone()))
4287 .await?;
4288 validator.finish(&store).await?;
4289
4290 let stored = store
4291 .get_session(&session.id)
4292 .await?
4293 .expect("session should exist");
4294 let stored_part = &stored.messages[0].parts[0];
4295 assert_eq!(stored_part, &part);
4296
4297 Ok(())
4298 }
4299
4300 fn base_session() -> Session {
4311 Session {
4312 id: "01HXY00000000001".to_owned(),
4313 parent_session_id: None,
4314 parent_message_id: None,
4315 source_agent: "claude-code".to_owned(),
4316 created_at: Utc::now(),
4317 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4318 options: ProviderOptions::new(),
4319 }
4320 }
4321
4322 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4323 outcomes
4324 .iter()
4325 .filter(|outcome| outcome.status == target)
4326 .count()
4327 }
4328
4329 #[tokio::test(flavor = "multi_thread")]
4330 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4331 -> anyhow::Result<()> {
4332 let temp = TempDir::new()?;
4333 let store = Store::open_local(temp.path()).await?;
4334
4335 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4336 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4337
4338 let mut again = base_session();
4339 again.options.insert("title".to_owned(), json!("renamed"));
4340 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4341 assert_eq!(
4342 count_status(&second, OutcomeStatus::Error),
4343 0,
4344 "options is mutable; the re-ingest must not surface an error: {second:?}",
4345 );
4346 assert_eq!(
4347 count_status(&second, OutcomeStatus::Matched),
4348 1,
4349 "unchanged immutable fields must match-insert via merge_insert",
4350 );
4351
4352 Ok(())
4353 }
4354
4355 #[tokio::test(flavor = "multi_thread")]
4356 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4357 let temp = TempDir::new()?;
4358 let store = Store::open_local(temp.path()).await?;
4359
4360 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4361 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4362
4363 let mut tampered = base_session();
4364 tampered.source_agent = "codex-cli".to_owned();
4365 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4366 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4367 let err_row = second
4368 .iter()
4369 .find(|outcome| outcome.status == OutcomeStatus::Error)
4370 .expect("error outcome present");
4371 let err = err_row.error.as_ref().expect("error body present");
4372 assert_eq!(err.field, Some("source_agent"));
4373 assert_eq!(err.reason, Some("immutable"));
4374
4375 let stored = store
4377 .get_session(&base_session().id)
4378 .await?
4379 .expect("session row survives the rejected re-ingest");
4380 assert_eq!(stored.session.source_agent, "claude-code");
4381
4382 Ok(())
4383 }
4384
4385 #[tokio::test(flavor = "multi_thread")]
4386 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4387 let temp = TempDir::new()?;
4388 let store = Store::open_local(temp.path()).await?;
4389
4390 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4391 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4392
4393 let mut tampered = base_session();
4394 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4395 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4396 let err_row = second
4397 .iter()
4398 .find(|outcome| outcome.status == OutcomeStatus::Error)
4399 .expect("project change must surface an error outcome");
4400 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4401
4402 let stored = store
4403 .get_session(&base_session().id)
4404 .await?
4405 .expect("session row survives");
4406 assert_eq!(
4407 stored.session.project.as_str(),
4408 "/home/me/proj",
4409 "stored project must remain the original",
4410 );
4411
4412 Ok(())
4413 }
4414
4415 #[tokio::test(flavor = "multi_thread")]
4416 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4417 use crate::wire::Provenance;
4425 let temp = TempDir::new()?;
4426 let store = Store::open_local(temp.path()).await?;
4427 let session = base_session();
4428
4429 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4430 session_id: session.id.clone(),
4431 id: part_id.to_owned(),
4432 message_id: message_id.to_owned(),
4433 ordinal: 0,
4434 provenance: Provenance::Conversational,
4435 options: ProviderOptions::new(),
4436 kind: PartKind::Text {
4437 text: Some(Extracted::from_test_value(body.to_owned())),
4438 },
4439 };
4440 let user_message = |id: &str| Message::User {
4441 id: id.to_owned(),
4442 session_id: session.id.clone(),
4443 timestamp: Utc::now(),
4444 options: ProviderOptions::new(),
4445 };
4446
4447 let mut validator = IngestValidator::default();
4449 validator
4450 .push(&store, 0, IngestEvent::Session(session.clone()))
4451 .await?;
4452 validator
4453 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4454 .await?;
4455 validator
4456 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4457 .await?;
4458 validator
4459 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4460 .await?;
4461 validator
4462 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4463 .await?;
4464 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4465 assert_eq!(first_counts.sessions_inserted, 1);
4466 assert_eq!(first_counts.messages_inserted_total, 2);
4467 assert_eq!(first_counts.messages_inserted_searchable, 2);
4468
4469 let mut validator = IngestValidator::default();
4471 validator
4472 .push(&store, 0, IngestEvent::Session(session.clone()))
4473 .await?;
4474 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4475 let pid = format!("p{}", idx + 3);
4476 validator
4477 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4478 .await?;
4479 validator
4480 .push(
4481 &store,
4482 idx * 2 + 2,
4483 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4484 )
4485 .await?;
4486 }
4487 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4488
4489 assert_eq!(
4490 second_counts.sessions_inserted, 0,
4491 "existing session row must report as Matched, not Inserted",
4492 );
4493 assert_eq!(second_counts.sessions_matched, 1);
4494 assert_eq!(
4495 second_counts.messages_inserted_total, 3,
4496 "the three NEW messages must register as Inserted in BatchCounts",
4497 );
4498 assert_eq!(
4499 second_counts.messages_inserted_searchable, 3,
4500 "all three new messages carry conversational text -> searchable",
4501 );
4502 assert_eq!(second_counts.messages_matched_total, 0);
4503 assert_eq!(second_counts.parts_inserted, 3);
4504 assert_eq!(second_counts.parts_matched, 0);
4505
4506 let session_outcome = second_outcomes
4509 .iter()
4510 .find(|outcome| outcome.kind == "session")
4511 .expect("session-row outcome present");
4512 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4513 for outcome in &second_outcomes {
4514 if outcome.kind == "message" || outcome.kind == "part" {
4515 assert_eq!(
4516 outcome.status,
4517 OutcomeStatus::Inserted,
4518 "new row must be Inserted, got: {outcome:?}",
4519 );
4520 }
4521 }
4522 Ok(())
4523 }
4524
4525 async fn store_with_messages(
4529 temp: &TempDir,
4530 count: usize,
4531 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4532 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4533 }
4534
4535 async fn store_with_messages_at_threshold(
4538 temp: &TempDir,
4539 count: usize,
4540 _vector_threshold: usize,
4541 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4542 let store = Store::open_local(temp.path()).await?;
4543 let sessions = 8.min(count.max(1));
4544 let mut events = Vec::new();
4545 for s in 0..sessions {
4546 events.push(IngestEvent::Session(Session {
4547 id: format!("session-{s}"),
4548 parent_session_id: None,
4549 parent_message_id: None,
4550 source_agent: "claude-code".to_owned(),
4551 created_at: Utc::now(),
4552 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4553 options: ProviderOptions::new(),
4554 }));
4555 for i in (s..count).step_by(sessions) {
4556 let message_id = format!("msg-{i}");
4557 events.push(IngestEvent::Message(Message::User {
4558 id: message_id.clone(),
4559 session_id: format!("session-{s}"),
4560 timestamp: Utc::now(),
4561 options: ProviderOptions::new(),
4562 }));
4563 events.push(IngestEvent::Part(Part {
4564 session_id: format!("session-{s}"),
4565 id: format!("{message_id}-part"),
4566 message_id,
4567 ordinal: 0,
4568 provenance: crate::wire::Provenance::Conversational,
4569 options: ProviderOptions::new(),
4570 kind: PartKind::Text {
4571 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4572 },
4573 }));
4574 }
4575 }
4576 ingest_events(&store, events).await?;
4577 let keys = (0..count)
4578 .map(|i| MessageKey {
4579 session_id: format!("session-{}", i % sessions),
4580 message_id: format!("msg-{i}"),
4581 })
4582 .collect();
4583 Ok((store, keys))
4584 }
4585
4586 fn synthetic_vector(seed: usize) -> Vec<f32> {
4588 let mut state = (seed as u64)
4589 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4590 .wrapping_add(1);
4591 (0..embedding_dim())
4592 .map(|_| {
4593 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4594 #[allow(clippy::cast_precision_loss)]
4595 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4596 unit - 1.0
4597 })
4598 .collect()
4599 }
4600
4601 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4603 keys.iter()
4604 .enumerate()
4605 .map(|(seed, key)| EmbeddedMessage {
4606 session_id: key.session_id.clone(),
4607 id: key.message_id.clone(),
4608 vector: synthetic_vector(seed),
4609 })
4610 .collect()
4611 }
4612
4613 fn embedding_update_batch_with_model(
4614 rows: &[EmbeddedMessage],
4615 model: &str,
4616 ) -> Result<RecordBatch> {
4617 let mut batch = embedding_update_batch(rows)?;
4618 let columns = batch
4619 .columns()
4620 .iter()
4621 .take(3)
4622 .cloned()
4623 .chain(std::iter::once(
4624 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4625 ))
4626 .collect::<Vec<_>>();
4627 batch = RecordBatch::try_new(batch.schema(), columns)?;
4628 Ok(batch)
4629 }
4630
4631 #[tokio::test]
4632 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4633 let temp = TempDir::new()?;
4634 let (store, keys) = store_with_messages(&temp, 4).await?;
4638 store.write_embeddings(&embedded(&keys)).await?;
4639 store
4640 .optimize_indices(None, &MaintenancePolicy::always_compact())
4641 .await?
4642 .into_result()?;
4643
4644 let query = vec![0.01_f32; embedding_dim()];
4645 let plan = store
4646 .explain_vector_plan(
4647 &query,
4648 10,
4649 &Predicate::Eq("session_id", "session-3".into()),
4650 None,
4651 )
4652 .await?;
4653
4654 assert!(
4659 plan.contains("ScalarIndexQuery"),
4660 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4661 );
4662 let predicate_postfiltered = plan
4663 .lines()
4664 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4665 assert!(
4666 !predicate_postfiltered,
4667 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4668 );
4669 Ok(())
4670 }
4671
4672 #[tokio::test]
4673 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4674 let temp = TempDir::new()?;
4675 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4676
4677 store.write_embeddings(&embedded(&keys[..255])).await?;
4680 store
4681 .optimize_indices_with_vector_threshold(256)
4682 .await?
4683 .into_result()?;
4684 assert!(
4685 !store
4686 .handle
4687 .messages_index_names()
4688 .await?
4689 .iter()
4690 .any(|name| name == MESSAGES_VECTOR_INDEX),
4691 "IVF_PQ must not exist below the activation threshold",
4692 );
4693
4694 store.write_embeddings(&embedded(&keys[255..256])).await?;
4697 store
4698 .optimize_indices_with_vector_threshold(256)
4699 .await?
4700 .into_result()?;
4701 assert!(
4702 store
4703 .handle
4704 .messages_index_names()
4705 .await?
4706 .iter()
4707 .any(|name| name == MESSAGES_VECTOR_INDEX),
4708 "optimize must create the IVF_PQ once the threshold is crossed",
4709 );
4710
4711 let hits = store
4714 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4715 .await?;
4716 assert!(
4717 hits.iter().any(|(key, _)| key == &keys[0]),
4718 "an embedded row is retrievable via the index",
4719 );
4720 Ok(())
4721 }
4722
4723 #[tokio::test]
4724 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4725 {
4726 let temp = TempDir::new()?;
4727 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4728 let old_rows = embedded(&keys);
4729 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4730 store
4731 .handle
4732 .merge_update(Table::Messages, old_batch, old_rows.len())
4733 .await?;
4734 store
4735 .optimize_indices_with_vector_threshold(256)
4736 .await?
4737 .into_result()?;
4738 assert!(
4739 store
4740 .handle
4741 .messages_index_names()
4742 .await?
4743 .iter()
4744 .any(|name| name == MESSAGES_VECTOR_INDEX),
4745 "IVF_PQ must exist before a model swap",
4746 );
4747 assert_eq!(store.stale_embedding_count().await?, keys.len());
4748
4749 store.drop_vector_index().await?;
4750 let mut pending = Vec::new();
4751 let stream = store.pending_or_stale_messages();
4752 tokio::pin!(stream);
4753 while let Some(row) = stream.next().await {
4754 pending.push(row?);
4755 }
4756 assert_eq!(
4757 pending.len(),
4758 keys.len(),
4759 "force stream should see stale rows"
4760 );
4761 store.write_embeddings(&embedded(&keys)).await?;
4762 assert_eq!(store.stale_embedding_count().await?, 0);
4763 store
4764 .optimize_indices_with_vector_threshold(256)
4765 .await?
4766 .into_result()?;
4767 assert!(
4768 store
4769 .handle
4770 .messages_index_names()
4771 .await?
4772 .iter()
4773 .any(|name| name == MESSAGES_VECTOR_INDEX),
4774 "optimize must rebuild IVF_PQ after force re-embed",
4775 );
4776
4777 let stream = store.pending_or_stale_messages();
4778 tokio::pin!(stream);
4779 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4780 Ok(())
4781 }
4782
4783 #[tokio::test]
4784 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4785 let temp = TempDir::new()?;
4794 let (store, _keys) = store_with_messages(&temp, 4).await?;
4795
4796 for tag in 0..3 {
4799 let extra = synthetic_session(&format!("extra-{tag}"));
4800 store.upsert_sessions(&[extra]).await?;
4801 }
4802
4803 let dataset = store.handle.dataset(Table::Sessions).await?;
4808 dataset
4809 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4810 .await
4811 .context("cleanup_old_versions failed")?;
4812
4813 let map = store.session_last_ingested_at().await?;
4814 let session_count = store.row_counts().await?.0;
4815 assert!(
4816 map.len() >= session_count,
4817 "watermark map ({}) must still cover every session ({}) after \
4818 version cleanup; an empty fallback regresses pond sync to a \
4819 full re-scan",
4820 map.len(),
4821 session_count,
4822 );
4823 Ok(())
4824 }
4825
4826 #[tokio::test]
4827 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4828 let temp = TempDir::new()?;
4829 let (store, keys) = store_with_messages(&temp, 10).await?;
4830
4831 let before = store.embedding_progress().await?;
4832 assert_eq!(before.embedded, 0);
4833 assert_eq!(before.total, 10);
4834 assert_eq!(before.model, crate::embed::model_id());
4835
4836 store.write_embeddings(&embedded(&keys[..4])).await?;
4837 let partial = store.embedding_progress().await?;
4838 assert_eq!(partial.embedded, 4);
4839 assert_eq!(partial.total, 10);
4840
4841 store.write_embeddings(&embedded(&keys[4..])).await?;
4842 let full = store.embedding_progress().await?;
4843 assert_eq!(full.embedded, 10);
4844 assert_eq!(full.total, 10);
4845 Ok(())
4846 }
4847}