1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{
32 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33 SessionFrom,
34 },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40 handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45 pub sessions: usize,
46 pub messages: usize,
47 pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52 pub sessions: u64,
53 pub messages: u64,
54 pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59 pub rows: LanceArchiveCounts,
60 pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65 pub rows: LanceArchiveCounts,
66 pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71 pub sessions: Vec<IndexIntent>,
72 pub messages: Vec<IndexIntent>,
73 pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78 [
79 (Table::Sessions, &self.sessions),
80 (Table::Messages, &self.messages),
81 (Table::Parts, &self.parts),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91 pub session_id: String,
92 pub id: String,
93 pub search_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100 pub session_id: String,
101 pub id: String,
102 pub vector: Vec<f32>,
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108 pub message_id: String,
109 pub session_id: String,
110 pub role: String,
111 pub project: String,
112 pub source_agent: String,
113 pub timestamp: DateTime<Utc>,
114 pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119 pub session_id: String,
120 pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125 Inserted,
126 Matched,
127}
128
129#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135 pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139 pub fn any_indices_failed(&self) -> bool {
142 self.tables.iter().any(|t| t.indices.is_failed())
143 }
144
145 pub fn into_result(self) -> Result<Self> {
149 for table in &self.tables {
150 if let PhaseOutcome::Failed(error) = &table.indices {
151 anyhow::bail!(
152 "indices phase failed on {}: {error:#}",
153 table.table.as_str()
154 );
155 }
156 if let PhaseOutcome::Failed(error) = &table.compaction {
157 anyhow::bail!(
158 "compaction phase failed on {}: {error:#}",
159 table.table.as_str()
160 );
161 }
162 }
163 Ok(self)
164 }
165}
166
167#[derive(Debug, Clone)]
170pub struct CorpusStats {
171 pub data_url: Url,
172 pub totals: RowTotals,
173 pub adapters: Vec<AdapterStats>,
181 pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189 pub sessions: u64,
190 pub messages: u64,
191 pub parts: u64,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201 pub embedded: usize,
202 pub total: usize,
203 pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208 pub adapter: String,
212 pub sessions: u64,
213 pub messages: u64,
214 pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221 pub project: String,
222 pub sessions: u64,
223 pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228 messages: u64,
229 session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234 pub message: &'a Message,
235 pub parts: &'a [Part],
236 pub search_text: Option<&'a str>,
237}
238
239impl Store {
240 pub async fn open(location: &Url) -> Result<Self> {
246 Ok(Self {
247 handle: Handle::open(location).await?,
248 })
249 }
250
251 pub async fn open_with_options(
257 location: &Url,
258 storage_options: std::collections::HashMap<String, String>,
259 caps: crate::substrate::RuntimeCaps,
260 ) -> Result<Self> {
261 Ok(Self {
262 handle: Handle::open_with_options(location, storage_options, caps).await?,
263 })
264 }
265
266 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271 let url = config::url_for_path(path)?;
272 Self::open_with_options(
273 &url,
274 std::collections::HashMap::new(),
275 crate::substrate::RuntimeCaps::default(),
276 )
277 .await
278 }
279
280 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288 std::fs::create_dir_all(dest)
289 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290 let (sessions, sessions_version) = self
291 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292 .await?;
293 let (messages, messages_version) = self
294 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295 .await?;
296 let (parts, parts_version) = self
297 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298 .await?;
299 Ok(LanceArchiveExport {
300 rows: LanceArchiveCounts {
301 sessions,
302 messages,
303 parts,
304 },
305 source_versions: LanceArchiveVersions {
306 sessions: sessions_version,
307 messages: messages_version,
308 parts: parts_version,
309 },
310 })
311 }
312
313 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314 let sessions_dataset =
315 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316 let messages_dataset =
317 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319 let (sessions, sessions_inserted) = self
320 .import_clean_table(Table::Sessions, sessions_dataset)
321 .await?;
322 let (messages, messages_inserted) = self
323 .import_clean_table(Table::Messages, messages_dataset)
324 .await?;
325 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326 Ok(LanceArchiveImport {
327 rows: LanceArchiveCounts {
328 sessions,
329 messages,
330 parts,
331 },
332 inserted: LanceArchiveCounts {
333 sessions: sessions_inserted,
334 messages: messages_inserted,
335 parts: parts_inserted,
336 },
337 })
338 }
339
340 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341 let dataset = self.handle.dataset(table).await?;
342 let source_version = dataset.version_id();
343 let schema = export_schema(table);
344 let mut stream = dataset
345 .scan()
346 .try_into_stream()
347 .await
348 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349 let dest_uri = dest
350 .to_str()
351 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353 let mut rows = 0usize;
354 let mut wrote = false;
355 while let Some(batch) = stream.next().await {
356 let batch = batch
357 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358 rows += batch.num_rows();
359 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360 let mut params = write_params_for_create();
361 if wrote {
362 params.mode = WriteMode::Append;
363 }
364 Dataset::write(reader, dest_uri, Some(params))
365 .await
366 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367 wrote = true;
368 }
369
370 if !wrote {
371 let batch = RecordBatch::new_empty(schema.clone());
372 let reader = RecordBatchIterator::new([Ok(batch)], schema);
373 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374 .await
375 .with_context(|| {
376 format!("failed to write empty {} archive table", table.as_str())
377 })?;
378 }
379 Ok((rows, source_version))
380 }
381
382 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383 let mut stream = dataset
384 .scan()
385 .try_into_stream()
386 .await
387 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388 let mut rows = 0usize;
389 let mut inserted = 0usize;
390 while let Some(batch) = stream.next().await {
391 let batch = batch
392 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393 let row_count = batch.num_rows();
394 rows += row_count;
395 inserted += self
396 .handle
397 .merge_insert(table, batch, row_count)
398 .await
399 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400 as usize;
401 }
402 Ok((rows, inserted))
403 }
404
405 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406 if sessions.is_empty() {
407 return Ok(Vec::new());
408 }
409 let batches = sessions_batches(sessions)?;
410 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411 Ok(statuses_from_inserted(sessions.len(), inserted))
413 }
414
415 async fn upsert_session_batch(
439 &self,
440 substreams: Vec<CompletedSubstream>,
441 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442 if substreams.is_empty() {
443 return Ok((Vec::new(), BatchCounts::default()));
444 }
445
446 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447 let mut counts = BatchCounts::default();
448
449 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453 let mut by_session_id: std::collections::HashMap<String, usize> =
454 std::collections::HashMap::with_capacity(substreams.len());
455 for substream in substreams {
456 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457 let existing = &merged[existing_idx];
458 if existing.session.source_agent != substream.session.source_agent
459 || existing.session.project != substream.session.project
460 {
461 let reason = if existing.session.source_agent != substream.session.source_agent
466 {
467 IngestError::ImmutableField {
468 field: "source_agent",
469 session_id: substream.session.id.clone(),
470 stored: existing.session.source_agent.clone(),
471 attempted: substream.session.source_agent.clone(),
472 }
473 } else {
474 IngestError::ImmutableField {
475 field: "project",
476 session_id: substream.session.id.clone(),
477 stored: (*existing.session.project).clone(),
478 attempted: (*substream.session.project).clone(),
479 }
480 };
481 let field = match &reason {
482 IngestError::ImmutableField { field, .. } => Some(*field),
483 };
484 let reason_key = match field {
485 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487 _ => DROP_REASON_UNCATEGORIZED,
488 };
489 outcomes.extend(error_outcomes_for_substream(
490 substream.session_index,
491 &substream.session,
492 &substream.messages,
493 reason.to_string(),
494 field,
495 reason_key,
496 ));
497 continue;
498 }
499 let existing = &mut merged[existing_idx];
504 let mut seen: std::collections::HashSet<String> = existing
505 .messages
506 .iter()
507 .map(|m| m.message.id().to_owned())
508 .collect();
509 for msg in substream.messages {
510 if seen.insert(msg.message.id().to_owned()) {
511 existing.messages.push(msg);
512 }
513 }
514 continue;
515 }
516 by_session_id.insert(substream.session.id.clone(), merged.len());
517 merged.push(substream);
518 }
519
520 let session_id_values: Vec<ScalarValue> = merged
525 .iter()
526 .map(|substream| ScalarValue::String(substream.session.id.clone()))
527 .collect();
528 let existing_sessions: std::collections::HashMap<String, Session> =
529 if session_id_values.is_empty() {
530 std::collections::HashMap::new()
531 } else {
532 let batch = self
533 .handle
534 .scan_batch(
535 Table::Sessions,
536 Some(&Predicate::In("id", session_id_values.clone())),
537 &[],
538 )
539 .await?;
540 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541 for row in 0..batch.num_rows() {
542 let session = session_from_batch(&batch, row)?;
543 map.insert(session.id.clone(), session);
544 }
545 map
546 };
547 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548 HashSet::new()
549 } else {
550 let batch = self
551 .handle
552 .scan_batch(
553 Table::Messages,
554 Some(&Predicate::In("session_id", session_id_values.clone())),
555 &["session_id", "id"],
556 )
557 .await?;
558 let mut set = HashSet::with_capacity(batch.num_rows());
559 for row in 0..batch.num_rows() {
560 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561 let mid = string(&batch, "id", row)?.context("message id is null")?;
562 set.insert((sid, mid));
563 }
564 set
565 };
566 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567 HashSet::new()
568 } else {
569 let batch = self
570 .handle
571 .scan_batch(
572 Table::Parts,
573 Some(&Predicate::In("session_id", session_id_values)),
574 &["session_id", "message_id", "id"],
575 )
576 .await?;
577 let mut set = HashSet::with_capacity(batch.num_rows());
578 for row in 0..batch.num_rows() {
579 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581 let pid = string(&batch, "id", row)?.context("part id is null")?;
582 set.insert((sid, mid, pid));
583 }
584 set
585 };
586
587 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588 for substream in merged {
589 if let Some(existing) = existing_sessions.get(&substream.session.id)
590 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591 {
592 let field = match &failure {
593 IngestError::ImmutableField { field, .. } => Some(*field),
594 };
595 let reason_key = match field {
596 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598 _ => DROP_REASON_UNCATEGORIZED,
599 };
600 outcomes.extend(error_outcomes_for_substream(
601 substream.session_index,
602 &substream.session,
603 &substream.messages,
604 failure.to_string(),
605 field,
606 reason_key,
607 ));
608 continue;
609 }
610 writeable.push(substream);
611 }
612
613 if writeable.is_empty() {
614 outcomes.sort_by_key(|outcome| outcome.index);
615 return Ok((outcomes, counts));
616 }
617
618 let sessions_owned: Vec<Session> = writeable
619 .iter()
620 .map(|substream| substream.session.clone())
621 .collect();
622 let message_rows: Vec<MessageBatchRow<'_>> = writeable
623 .iter()
624 .flat_map(|substream| {
625 substream.messages.iter().map(|buffered| MessageBatchRow {
626 message: &buffered.message,
627 source_agent: &substream.session.source_agent,
628 project: &substream.session.project,
629 search_text: buffered.search_text.as_deref(),
630 })
631 })
632 .collect();
633 let part_rows: Vec<Part> = writeable
634 .iter()
635 .flat_map(|substream| {
636 substream.messages.iter().flat_map(|buffered| {
637 buffered
638 .parts
639 .iter()
640 .map(|buffered_part| buffered_part.part.clone())
641 })
642 })
643 .collect();
644
645 let session_batches = sessions_batches(&sessions_owned)?;
646 let message_batches = messages_batches(&message_rows)?;
647 let part_batches = parts_batches(&part_rows)?;
648
649 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660 )?;
661
662 for substream in &writeable {
663 outcomes.extend(success_outcomes_for_substream(
664 substream.session_index,
665 &substream.session,
666 &substream.messages,
667 &existing_sessions,
668 &existing_message_pks,
669 &existing_part_pks,
670 &mut counts,
671 ));
672 }
673
674 outcomes.sort_by_key(|outcome| outcome.index);
675 Ok((outcomes, counts))
676 }
677
678 pub async fn upsert_messages(
679 &self,
680 session: &Session,
681 messages: &[MessageWrite<'_>],
682 ) -> Result<Vec<UpsertStatus>> {
683 if messages.is_empty() {
684 return Ok(Vec::new());
685 }
686
687 let rows = messages
688 .iter()
689 .map(|write| MessageBatchRow {
690 message: write.message,
691 source_agent: &session.source_agent,
692 project: &session.project,
693 search_text: write.search_text,
694 })
695 .collect::<Vec<_>>();
696 let batches = messages_batches(&rows)?;
697 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698 Ok(statuses_from_inserted(messages.len(), inserted))
699 }
700
701 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702 if parts.is_empty() {
703 return Ok(Vec::new());
704 }
705 let batches = parts_batches(parts)?;
706 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707 Ok(statuses_from_inserted(parts.len(), inserted))
708 }
709
710 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711 let Some(session) = self.find_session(session_id).await? else {
712 return Ok(None);
713 };
714 let messages = self.messages_for_session(session_id).await?;
715 Ok(Some(SessionWithMessages { session, messages }))
716 }
717
718 pub async fn session_ids(&self) -> Result<Vec<String>> {
720 let batch = self
721 .handle
722 .scan_batch(Table::Sessions, None, &["id"])
723 .await?;
724 let mut ids = Vec::with_capacity(batch.num_rows());
725 for row in 0..batch.num_rows() {
726 if let Some(id) = string(&batch, "id", row)? {
727 ids.push(id);
728 }
729 }
730 Ok(ids)
731 }
732
733 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734 let batch = self
735 .handle
736 .scan_batch(
737 Table::Sessions,
738 Some(&Predicate::Eq(
739 "parent_session_id",
740 parent_session_id.into(),
741 )),
742 &[
743 "id",
744 "parent_session_id",
745 "parent_message_id",
746 "source_agent",
747 "created_at",
748 "project",
749 "options",
750 ],
751 )
752 .await?;
753 let mut sessions = Vec::with_capacity(batch.num_rows());
754 for row in 0..batch.num_rows() {
755 sessions.push(session_from_batch(&batch, row)?);
756 }
757 sessions.sort_by(|left, right| left.id.cmp(&right.id));
758 Ok(sessions)
759 }
760
761 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767 use lance::deps::arrow_array::UInt64Array;
768
769 let dataset = self.handle.dataset(Table::Sessions).await?;
770 let version_list = dataset.versions().await?;
771 let versions: HashMap<u64, DateTime<Utc>> = version_list
772 .iter()
773 .map(|v| (v.version, v.timestamp))
774 .collect();
775 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784 let scanner = self
785 .handle
786 .scan(
787 Table::Sessions,
788 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789 )
790 .await?;
791 let mut stream = scanner.try_into_stream().await?;
792 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793 while let Some(batch) = stream.next().await {
794 let batch = batch?;
795 let version_array = batch
796 .column_by_name("_row_last_updated_at_version")
797 .context("missing _row_last_updated_at_version column")?
798 .as_any()
799 .downcast_ref::<UInt64Array>()
800 .context("_row_last_updated_at_version is not UInt64")?;
801 for row in 0..batch.num_rows() {
802 let Some(id) = string(&batch, "id", row)? else {
803 continue;
804 };
805 if version_array.is_null(row) {
806 continue;
807 }
808 let version = version_array.value(row);
809 let ts = versions.get(&version).copied().or(oldest_visible_ts);
810 if let Some(ts) = ts {
811 out.insert(id, ts);
812 }
813 }
814 }
815 Ok(out)
816 }
817
818 pub async fn session_view(
825 &self,
826 session_id: &str,
827 params: SessionViewParams<'_>,
828 ) -> Result<GetLookup<SessionPage>> {
829 let Some(session) = self.find_session(session_id).await? else {
830 return Ok(GetLookup::NotFound);
831 };
832
833 let mut rows = match params.mode {
834 ResponseMode::Conversational => self
835 .scan_conversational_messages(session_id)
836 .await?
837 .into_iter()
838 .map(|row| ScanRow {
839 id: row.message_id,
840 role: row.role,
841 timestamp: row.timestamp,
842 text: Some(row.text.into_inner()),
843 content: None,
844 })
845 .collect(),
846 ResponseMode::Complete | ResponseMode::Verbatim => {
847 self.scan_all_messages(session_id).await?
848 }
849 };
850 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852 let start_at = match params.after_id {
853 Some(after) => match rows.iter().position(|row| row.id == after) {
856 Some(idx) => idx + 1,
857 None => return Ok(GetLookup::UnknownAfterId),
858 },
859 None => 0,
860 };
861 let remaining = rows.get(start_at..).unwrap_or(&[]);
862 let (emitted, messages_remaining) = match params.session_from {
863 SessionFrom::Start => {
864 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865 row.text.as_deref().map_or(0, str::len)
866 });
867 (&remaining[..n], remaining.len() - n)
868 }
869 SessionFrom::End => {
873 let mut bytes = 0usize;
874 let mut start = remaining.len();
875 for row in remaining.iter().rev() {
876 if remaining.len() - start >= params.limit {
877 break;
878 }
879 let size = row.text.as_deref().map_or(0, str::len);
880 if start < remaining.len() && bytes + size > params.budget_bytes {
881 break;
882 }
883 bytes += size;
884 start -= 1;
885 }
886 (&remaining[start..], start)
887 }
888 };
889 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891 let mut parts_by_message = match params.mode {
894 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895 ResponseMode::Conversational | ResponseMode::Complete => {
896 self.summary_parts_for_messages(session_id, &ids).await?
897 }
898 };
899 let messages = emitted
900 .iter()
901 .map(|row| RetrievedMessage {
902 id: row.id.clone(),
903 role: row.role,
904 timestamp: row.timestamp,
905 text: row.text.clone(),
906 content: row.content.clone(),
907 parts: parts_by_message
908 .remove(&(session_id.to_owned(), row.id.clone()))
909 .unwrap_or_default(),
910 })
911 .collect();
912
913 Ok(GetLookup::Found(SessionPage {
914 session,
915 messages,
916 messages_remaining,
917 }))
918 }
919
920 pub async fn message_view(
926 &self,
927 message_id: &str,
928 params: MessageViewParams<'_>,
929 ) -> Result<GetLookup<MessagePage>> {
930 let Some(session_id) = self.session_id_for_message(message_id).await? else {
931 return Ok(GetLookup::NotFound);
932 };
933 let Some(session) = self.find_session(&session_id).await? else {
934 return Ok(GetLookup::NotFound);
935 };
936 let mut rows = self.scan_all_messages(&session_id).await?;
937 if matches!(params.mode, ResponseMode::Conversational) {
943 rows.retain(|row| row.text.is_some() || row.id == message_id);
944 }
945 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
946 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
947 return Ok(GetLookup::NotFound);
948 };
949
950 let start = target_pos.saturating_sub(params.context_depth);
951 let end = (target_pos + params.context_depth + 1).min(rows.len());
952 let window = &rows[start..end];
953 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
954 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
957
958 let all_parts = parts_by_message
959 .remove(&(session_id.clone(), message_id.to_owned()))
960 .unwrap_or_default();
961 let start_part = match params.after_id {
962 Some(after) => match all_parts.iter().find(|part| part.id == after) {
966 Some(anchor) => all_parts
967 .iter()
968 .position(|part| part.ordinal > anchor.ordinal)
969 .unwrap_or(all_parts.len()),
970 None => return Ok(GetLookup::UnknownAfterId),
971 },
972 None => 0,
973 };
974 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
975 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
976 serde_json::to_string(part).map_or(0, |json| json.len())
977 });
978 let target_parts = remaining_parts[..part_count].to_vec();
979 let target_parts_remaining = remaining_parts.len() - part_count;
980
981 let target_row = &rows[target_pos];
982 let target = RetrievedMessage {
983 id: target_row.id.clone(),
984 role: target_row.role,
985 timestamp: target_row.timestamp,
986 text: target_row.text.clone(),
987 content: target_row.content.clone(),
988 parts: Vec::new(),
990 };
991 let siblings = window
992 .iter()
993 .enumerate()
994 .filter(|(idx, _)| start + idx != target_pos)
995 .map(|(_, row)| RetrievedMessage {
996 id: row.id.clone(),
997 role: row.role,
998 timestamp: row.timestamp,
999 text: row.text.clone(),
1000 content: row.content.clone(),
1001 parts: parts_by_message
1002 .get(&(session_id.clone(), row.id.clone()))
1003 .cloned()
1004 .unwrap_or_default(),
1005 })
1006 .collect();
1007
1008 Ok(GetLookup::Found(MessagePage {
1009 session,
1010 target,
1011 target_parts,
1012 target_parts_remaining,
1013 siblings,
1014 }))
1015 }
1016
1017 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1018 let batch = self
1019 .handle
1020 .scan_batch(
1021 Table::Messages,
1022 Some(&Predicate::Eq("session_id", session_id.into())),
1023 &["id", "timestamp", "role", "search_text", "content"],
1024 )
1025 .await?;
1026 let mut rows = Vec::with_capacity(batch.num_rows());
1027 for row in 0..batch.num_rows() {
1028 let id = string(&batch, "id", row)?.context("message id is null")?;
1029 let role =
1030 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1031 let timestamp = datetime(&batch, "timestamp", row)?;
1032 rows.push(ScanRow {
1033 id,
1034 role,
1035 timestamp,
1036 text: string(&batch, "search_text", row)?,
1037 content: string(&batch, "content", row)?,
1038 });
1039 }
1040 Ok(rows)
1041 }
1042
1043 pub async fn scan_conversational_messages(
1047 &self,
1048 session_id: &str,
1049 ) -> Result<Vec<ConversationalRow>> {
1050 let filter = Predicate::And(vec![
1051 Predicate::Eq("session_id", session_id.into()),
1052 Predicate::IsNotNull("search_text"),
1053 ]);
1054 let batch = self
1055 .handle
1056 .scan_batch(
1057 Table::Messages,
1058 Some(&filter),
1059 &["id", "timestamp", "role", "search_text"],
1060 )
1061 .await?;
1062
1063 let mut rows = Vec::with_capacity(batch.num_rows());
1064 for row in 0..batch.num_rows() {
1065 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1066 let role =
1067 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1068 let timestamp = datetime(&batch, "timestamp", row)?;
1069 let text_str = string(&batch, "search_text", row)?.context(
1070 "search_text null after IsNotNull pushdown - storage invariant violated",
1071 )?;
1072 rows.push(ConversationalRow {
1073 session_id: session_id.to_owned(),
1074 message_id,
1075 role,
1076 timestamp,
1077 text: SearchText(text_str),
1078 });
1079 }
1080 rows.sort_by(|a, b| {
1081 a.timestamp
1082 .cmp(&b.timestamp)
1083 .then_with(|| a.message_id.cmp(&b.message_id))
1084 });
1085 Ok(rows)
1086 }
1087
1088 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1091 let batch = self
1092 .handle
1093 .scan_batch(
1094 Table::Messages,
1095 Some(&Predicate::Eq("id", message_id.into())),
1096 &["session_id"],
1097 )
1098 .await?;
1099 if batch.num_rows() == 0 {
1100 return Ok(None);
1101 }
1102 string(&batch, "session_id", 0)
1103 }
1104
1105 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1106 self.handle.row_counts().await
1107 }
1108
1109 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1113 Ok(Arc::new(self.handle.dataset(table).await?))
1114 }
1115
1116 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1118 self.handle.export_write(name, bytes).await
1119 }
1120
1121 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1123 self.handle.export_read(name).await
1124 }
1125
1126 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1128 self.handle.export_local_path(name)
1129 }
1130
1131 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1137 let scanner = self
1138 .handle
1139 .scan(
1140 Table::Messages,
1141 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1142 )
1143 .await?;
1144 let mut stream = scanner.try_into_stream().await?;
1145 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1146 while let Some(batch) = stream.next().await {
1147 let batch = batch?;
1148 for row in 0..batch.num_rows() {
1149 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1150 let project = string(&batch, "project", row)?.unwrap_or_default();
1151 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1152 let is_subagent = source_agent.contains('/');
1153 if is_subagent && !include_subagents {
1154 continue;
1155 }
1156 let entry = groups.entry((source_agent, project)).or_default();
1157 entry.messages += 1;
1158 entry.session_ids.insert(session_id);
1159 }
1160 }
1161
1162 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1163 let totals = RowTotals {
1164 sessions: totals_sessions as u64,
1165 messages: totals_messages as u64,
1166 parts: totals_parts as u64,
1167 };
1168
1169 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1170 for ((adapter, project), acc) in groups {
1171 by_adapter.entry(adapter).or_default().push(ProjectStats {
1172 project,
1173 sessions: acc.session_ids.len() as u64,
1174 messages: acc.messages,
1175 });
1176 }
1177
1178 let mut adapters = Vec::with_capacity(by_adapter.len());
1179 for (adapter, mut projects) in by_adapter {
1180 projects.sort_by(|a, b| {
1181 b.messages
1182 .cmp(&a.messages)
1183 .then_with(|| a.project.cmp(&b.project))
1184 });
1185 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1186 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1187 adapters.push(AdapterStats {
1188 adapter,
1189 sessions,
1190 messages,
1191 projects,
1192 });
1193 }
1194
1195 Ok(CorpusStats {
1196 data_url: self.handle.location().clone(),
1197 totals,
1198 adapters,
1199 include_subagents,
1200 })
1201 }
1202
1203 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1208 if rows.is_empty() {
1209 return Ok(());
1210 }
1211 let batch = embedding_update_batch(rows)?;
1212 self.handle
1213 .merge_update(Table::Messages, batch, rows.len())
1214 .await?;
1215 Ok(())
1216 }
1217
1218 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1221 try_stream! {
1222 let filter = Predicate::And(vec![
1223 Predicate::IsNull("vector"),
1224 Predicate::IsNotNull("search_text"),
1225 ]);
1226 let projection: &[&str] = &["session_id", "id", "search_text"];
1227 let scanner = self
1228 .handle
1229 .scan(
1230 Table::Messages,
1231 ScanOpts::with_predicate_and_projection(&filter, projection),
1232 )
1233 .await?;
1234 let mut batches = scanner
1235 .try_into_stream()
1236 .await
1237 .context("failed to open messages stream")?;
1238 while let Some(batch) = batches.next().await {
1239 let batch = batch?;
1240 for row in 0..batch.num_rows() {
1241 yield PendingMessage {
1242 session_id: string(&batch, "session_id", row)?
1243 .context("session_id is null")?,
1244 id: string(&batch, "id", row)?.context("message id is null")?,
1245 search_text: string(&batch, "search_text", row)?
1246 .context("search_text is null")?,
1247 };
1248 }
1249 }
1250 }
1251 }
1252
1253 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1258 try_stream! {
1259 let filter = Predicate::And(vec![
1260 Predicate::IsNotNull("search_text"),
1261 Predicate::Or(vec![
1262 Predicate::IsNull("vector"),
1263 Predicate::Ne("embedding_model", embed::model_id().into()),
1264 ]),
1265 ]);
1266 let projection: &[&str] = &["session_id", "id", "search_text"];
1267 let scanner = self
1268 .handle
1269 .scan(
1270 Table::Messages,
1271 ScanOpts::with_predicate_and_projection(&filter, projection),
1272 )
1273 .await?;
1274 let mut batches = scanner
1275 .try_into_stream()
1276 .await
1277 .context("failed to open pending-or-stale messages stream")?;
1278 while let Some(batch) = batches.next().await {
1279 let batch = batch?;
1280 for row in 0..batch.num_rows() {
1281 yield PendingMessage {
1282 session_id: string(&batch, "session_id", row)?
1283 .context("session_id is null")?,
1284 id: string(&batch, "id", row)?.context("message id is null")?,
1285 search_text: string(&batch, "search_text", row)?
1286 .context("search_text is null")?,
1287 };
1288 }
1289 }
1290 }
1291 }
1292
1293 pub async fn fts_search(
1295 &self,
1296 query: &str,
1297 limit: usize,
1298 filter: &Predicate,
1299 ) -> Result<Vec<(MessageKey, f32)>> {
1300 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1301 scanner.full_text_search(
1302 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1303 )?;
1304 scanner.disable_scoring_autoprojection();
1310 scanner.project(&["session_id", "id", "_score"])?;
1311 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1312 let batch = scanner.try_into_batch().await?;
1313 let mut hits = Vec::with_capacity(batch.num_rows());
1314 for row in 0..batch.num_rows() {
1315 let key = MessageKey {
1316 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1317 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1318 };
1319 hits.push((key, float32(&batch, "_score", row)?));
1320 }
1321 hits.sort_by(|left, right| {
1329 right
1330 .1
1331 .partial_cmp(&left.1)
1332 .unwrap_or(std::cmp::Ordering::Equal)
1333 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1334 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1335 });
1336 Ok(hits)
1337 }
1338
1339 pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1346 let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1347 let dataset = self.handle.dataset(Table::Messages).await?;
1348 dataset
1349 .count_rows(Some(scope.to_lance()))
1350 .await
1351 .map_err(Into::into)
1352 }
1353
1354 pub async fn has_embeddings(&self) -> Result<bool> {
1359 let scope = Predicate::IsNotNull("vector");
1360 let mut scanner = self
1361 .handle
1362 .scan(
1363 Table::Messages,
1364 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1365 )
1366 .await?;
1367 scanner.limit(Some(1), None)?;
1368 let batch = scanner.try_into_batch().await?;
1369 Ok(batch.num_rows() > 0)
1370 }
1371
1372 pub async fn vector_search(
1380 &self,
1381 query: &[f32],
1382 limit: usize,
1383 filter: &Predicate,
1384 search: Option<&config::SearchConfig>,
1385 ) -> Result<Vec<(MessageKey, f32)>> {
1386 let scope = embedded_scope(filter);
1387 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1388 let key = Float32Array::from(query.to_vec());
1389 scanner.nearest("vector", &key, limit)?;
1390 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1391 scanner.nprobes(nprobes);
1392 }
1393 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1394 scanner.refine(refine_factor);
1395 }
1396 scanner.disable_scoring_autoprojection();
1400 scanner.project(&["session_id", "id", "_distance"])?;
1401 let batch = scanner.try_into_batch().await?;
1402 let mut hits = Vec::with_capacity(batch.num_rows());
1403 for row in 0..batch.num_rows() {
1404 let key = MessageKey {
1405 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1406 message_id: string(&batch, "id", row)?.context("message id is null")?,
1407 };
1408 hits.push((key, float32(&batch, "_distance", row)?));
1409 }
1410 hits.sort_by(|left, right| {
1416 left.1
1417 .partial_cmp(&right.1)
1418 .unwrap_or(std::cmp::Ordering::Equal)
1419 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1420 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1421 });
1422 Ok(hits)
1423 }
1424
1425 pub async fn explain_vector_plan(
1428 &self,
1429 query: &[f32],
1430 limit: usize,
1431 filter: &Predicate,
1432 search: Option<&config::SearchConfig>,
1433 ) -> Result<String> {
1434 let scope = embedded_scope(filter);
1435 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1436 let key = Float32Array::from(query.to_vec());
1437 scanner.nearest("vector", &key, limit)?;
1438 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1439 scanner.nprobes(nprobes);
1440 }
1441 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1442 scanner.refine(refine_factor);
1443 }
1444 scanner
1445 .explain_plan(true)
1446 .await
1447 .context("explain_plan failed")
1448 }
1449
1450 pub async fn explain_fts_plan(
1451 &self,
1452 query: &str,
1453 limit: usize,
1454 filter: &Predicate,
1455 ) -> Result<String> {
1456 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1457 scanner.full_text_search(
1458 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1459 )?;
1460 scanner.project(&["session_id", "id"])?;
1461 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1462 scanner
1463 .explain_plan(true)
1464 .await
1465 .context("explain_plan failed")
1466 }
1467
1468 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1470 if keys.is_empty() {
1471 return Ok(Vec::new());
1472 }
1473 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1474 let session_ids = keys
1475 .iter()
1476 .map(|key| key.session_id.clone())
1477 .collect::<Vec<_>>();
1478 let message_ids = keys
1479 .iter()
1480 .map(|key| key.message_id.clone())
1481 .collect::<Vec<_>>();
1482 let predicate = Predicate::And(vec![
1483 in_predicate("session_id", &session_ids),
1484 in_predicate("id", &message_ids),
1485 ]);
1486 let batch = self
1487 .handle
1488 .scan_batch(
1489 Table::Messages,
1490 Some(&predicate),
1491 &[
1492 "id",
1493 "session_id",
1494 "role",
1495 "project",
1496 "source_agent",
1497 "timestamp",
1498 "search_text",
1499 ],
1500 )
1501 .await?;
1502 let mut metas = Vec::with_capacity(batch.num_rows());
1503 for row in 0..batch.num_rows() {
1504 let message_id = string(&batch, "id", row)?.context("id is null")?;
1505 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1506 if !wanted.contains(&MessageKey {
1507 session_id: session_id.clone(),
1508 message_id: message_id.clone(),
1509 }) {
1510 continue;
1511 }
1512 metas.push(MessageMeta {
1513 message_id,
1514 session_id,
1515 role: string(&batch, "role", row)?.context("role is null")?,
1516 project: string(&batch, "project", row)?.context("project is null")?,
1517 source_agent: string(&batch, "source_agent", row)?
1518 .context("source_agent is null")?,
1519 timestamp: datetime(&batch, "timestamp", row)?,
1520 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1521 });
1522 }
1523 Ok(metas)
1524 }
1525
1526 pub async fn session_message_counts(
1528 &self,
1529 session_ids: &[String],
1530 ) -> Result<BTreeMap<String, usize>> {
1531 if session_ids.is_empty() {
1532 return Ok(BTreeMap::new());
1533 }
1534 let dataset = self.handle.dataset(Table::Messages).await?;
1535 let mut tasks = tokio::task::JoinSet::new();
1536 for session_id in session_ids {
1537 let dataset = dataset.clone();
1538 let session_id = session_id.clone();
1539 tasks.spawn(async move {
1540 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1541 let count = dataset.count_rows(Some(filter)).await?;
1542 anyhow::Ok((session_id, count))
1543 });
1544 }
1545 let mut counts = BTreeMap::new();
1546 while let Some(joined) = tasks.join_next().await {
1547 let (session_id, count) = joined.context("session count task panicked")??;
1548 counts.insert(session_id, count);
1549 }
1550 Ok(counts)
1551 }
1552
1553 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1556 self.handle
1557 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1558 .await
1559 }
1560
1561 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1567 self.handle
1568 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1569 .await
1570 }
1571
1572 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1579 let dataset = self.handle.dataset(Table::Messages).await?;
1580 let embedded = dataset
1581 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1582 .await?;
1583 let total = dataset
1584 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1585 .await?;
1586 Ok(EmbeddingProgress {
1587 embedded,
1588 total,
1589 model: embed::model_id(),
1590 })
1591 }
1592
1593 pub async fn stale_embedding_count(&self) -> Result<usize> {
1597 let dataset = self.handle.dataset(Table::Messages).await?;
1598 dataset
1599 .count_rows(Some(
1600 Predicate::And(vec![
1601 Predicate::IsNotNull("vector"),
1602 Predicate::Ne("embedding_model", embed::model_id().into()),
1603 ])
1604 .to_lance(),
1605 ))
1606 .await
1607 .map_err(Into::into)
1608 }
1609
1610 pub async fn optimize_indices(
1616 &self,
1617 progress: Option<OptimizeProgressFn>,
1618 maintenance: &MaintenancePolicy,
1619 ) -> Result<OptimizeOutcome> {
1620 let intents = pond_index_intents();
1621 let mut tables = Vec::with_capacity(3);
1622 for (table, intents) in intents.all() {
1623 let outcome = self
1624 .handle
1625 .optimize_table(table, intents, progress.as_ref(), maintenance)
1626 .await;
1627 tables.push(outcome);
1628 }
1629 Ok(OptimizeOutcome { tables })
1630 }
1631
1632 pub async fn build_indices_only(
1638 &self,
1639 progress: Option<OptimizeProgressFn>,
1640 ) -> Result<OptimizeOutcome> {
1641 let policy = pond_index_intents();
1642 let mut tables = Vec::with_capacity(3);
1643 for (table, intents) in policy.all() {
1644 let indices = self
1645 .handle
1646 .optimize_table_indices_only(table, intents, progress.as_ref())
1647 .await;
1648 tables.push(TableOptimizeOutcome {
1649 table,
1650 indices,
1651 compaction: PhaseOutcome::NotAttempted,
1652 });
1653 }
1654 Ok(OptimizeOutcome { tables })
1655 }
1656
1657 #[cfg(test)]
1658 async fn optimize_indices_with_vector_threshold(
1659 &self,
1660 vector_threshold: usize,
1661 ) -> Result<OptimizeOutcome> {
1662 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1663 let policy = MaintenancePolicy::always_compact();
1664 let mut tables = Vec::with_capacity(3);
1665 for (table, intents) in intents.all() {
1666 let outcome = self
1667 .handle
1668 .optimize_table(table, intents, None, &policy)
1669 .await;
1670 tables.push(outcome);
1671 }
1672 Ok(OptimizeOutcome { tables })
1673 }
1674
1675 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1676 let policy = pond_index_intents();
1677 let mut matched = false;
1678 for (table, intents) in policy.all() {
1679 for intent in intents {
1680 if intent_name.is_none_or(|name| name == intent.name) {
1681 matched = true;
1682 self.handle.rebuild_index(table, intent).await?;
1683 }
1684 }
1685 }
1686 if let Some(name) = intent_name
1687 && !matched
1688 {
1689 anyhow::bail!("unknown index intent {name:?}");
1690 }
1691 Ok(())
1692 }
1693
1694 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1695 let policy = pond_index_intents();
1696 let mut statuses = Vec::new();
1697 for (table, intents) in policy.all() {
1698 statuses.extend(self.handle.index_status(table, intents).await?);
1699 }
1700 Ok(statuses)
1701 }
1702
1703 pub async fn drop_vector_index(&self) -> Result<()> {
1707 match self
1708 .handle
1709 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1710 .await
1711 {
1712 Ok(()) => Ok(()),
1713 Err(error) => {
1714 let msg = error.to_string();
1715 if msg.contains("not found") || msg.contains("does not exist") {
1716 Ok(())
1717 } else {
1718 Err(error)
1719 }
1720 }
1721 }
1722 }
1723
1724 pub async fn table_sizes(&self) -> Result<TableSizes> {
1727 self.handle.table_sizes().await
1728 }
1729
1730 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1731 let batch = self
1732 .handle
1733 .scan_batch(
1734 Table::Sessions,
1735 Some(&Predicate::Eq("id", session_id.into())),
1736 &[],
1737 )
1738 .await?;
1739 if batch.num_rows() == 0 {
1740 Ok(None)
1741 } else {
1742 Ok(Some(session_from_batch(&batch, 0)?))
1743 }
1744 }
1745
1746 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1747 let batch = self
1748 .handle
1749 .scan_batch(
1750 Table::Messages,
1751 Some(&Predicate::Eq("session_id", session_id.into())),
1752 &[
1753 "session_id",
1754 "id",
1755 "timestamp",
1756 "role",
1757 "content",
1758 "options",
1759 ],
1760 )
1761 .await?;
1762 let mut messages = Vec::with_capacity(batch.num_rows());
1763 for row in 0..batch.num_rows() {
1764 messages.push(message_from_batch(&batch, row)?);
1765 }
1766 messages.sort_by(|left, right| {
1767 left.timestamp()
1768 .cmp(&right.timestamp())
1769 .then_with(|| left.id().cmp(right.id()))
1770 });
1771
1772 let message_ids = messages
1773 .iter()
1774 .map(|message| message.id().to_owned())
1775 .collect::<Vec<_>>();
1776 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1777
1778 Ok(messages
1779 .into_iter()
1780 .map(|message| {
1781 let key = (message.session_id().to_owned(), message.id().to_owned());
1782 let parts = parts_by_message.remove(&key).unwrap_or_default();
1783 MessageWithParts { message, parts }
1784 })
1785 .collect())
1786 }
1787
1788 pub async fn parts_for_messages(
1792 &self,
1793 session_id: &str,
1794 message_ids: &[String],
1795 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1796 self.scan_parts(session_id, message_ids, None).await
1797 }
1798
1799 pub async fn summary_parts_for_messages(
1804 &self,
1805 session_id: &str,
1806 message_ids: &[String],
1807 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1808 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1809 .await
1810 }
1811
1812 async fn scan_parts(
1813 &self,
1814 session_id: &str,
1815 message_ids: &[String],
1816 part_types: Option<&[&str]>,
1817 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1818 if message_ids.is_empty() {
1819 return Ok(BTreeMap::new());
1820 }
1821 let mut clauses = vec![
1822 Predicate::Eq("session_id", session_id.into()),
1823 in_predicate("message_id", message_ids),
1824 ];
1825 if let Some(types) = part_types {
1826 clauses.push(Predicate::In(
1827 "type",
1828 types.iter().map(|&t| t.into()).collect(),
1829 ));
1830 }
1831 let predicate = Predicate::And(clauses);
1832 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1833 let mut scanner = self
1834 .handle
1835 .scan(
1836 Table::Parts,
1837 ScanOpts::with_predicate_and_projection(
1838 &predicate,
1839 &[
1840 "session_id",
1841 "message_id",
1842 "id",
1843 "ordinal",
1844 "type",
1845 "provenance",
1846 "variant_data",
1847 "options",
1848 ],
1849 ),
1850 )
1851 .await?;
1852 scanner.with_row_address();
1853 let batch = scanner.try_into_batch().await.context("scan failed")?;
1854 let row_addresses = uint64(&batch, "_rowaddr")?;
1855 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1856 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1857 for row in 0..batch.num_rows() {
1858 if string(&batch, "type", row)?.as_deref() == Some("file") {
1859 let variant_data =
1860 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1861 file_rows.push((row, row_addresses.value(row), variant_data));
1862 }
1863 }
1864 if !file_rows.is_empty() {
1865 let addresses = file_rows
1866 .iter()
1867 .map(|(_, address, _)| *address)
1868 .collect::<Vec<_>>();
1869 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1870 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1871 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1875 file_payloads.insert(row, payload);
1876 }
1877 }
1878 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1879 for row in 0..batch.num_rows() {
1880 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1881 parts_by_message
1882 .entry((part.session_id.clone(), part.message_id.clone()))
1883 .or_default()
1884 .push(part);
1885 }
1886 for parts in parts_by_message.values_mut() {
1887 parts.sort_by_key(|part| part.ordinal);
1888 }
1889 Ok(parts_by_message)
1890 }
1891}
1892
1893#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1894#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1895pub enum IngestEvent {
1896 Session(Session),
1897 Message(Message),
1898 Part(Part),
1899}
1900
1901#[derive(Debug, Clone, PartialEq, Eq, Default)]
1909pub struct IngestSummary {
1910 pub inserted: usize,
1914 pub matched: usize,
1916 pub sessions_inserted: usize,
1918 pub messages_inserted_total: usize,
1921 pub messages_inserted_searchable: usize,
1925 pub parts_inserted: usize,
1927 pub sessions_matched: usize,
1929 pub messages_matched_total: usize,
1931 pub messages_matched_searchable: usize,
1933 pub parts_matched: usize,
1935 pub dropped_events: usize,
1945 pub dropped_sessions: usize,
1950 pub skipped_files: usize,
1953 pub skipped_empty: usize,
1958 pub skipped_fresh: usize,
1962 pub storage_errors: usize,
1966 pub truncated_values: usize,
1969 pub drop_reasons: BTreeMap<&'static str, usize>,
1975}
1976
1977pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1983pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1984pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1985pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1986pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1987pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1988pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1989pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1990pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1991pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1992pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1993
1994#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2002pub struct BatchCounts {
2003 pub sessions_inserted: usize,
2004 pub sessions_matched: usize,
2005 pub messages_inserted_total: usize,
2006 pub messages_inserted_searchable: usize,
2007 pub messages_matched_total: usize,
2008 pub messages_matched_searchable: usize,
2009 pub parts_inserted: usize,
2010 pub parts_matched: usize,
2011}
2012
2013impl IngestSummary {
2014 pub fn accepted(&self) -> usize {
2015 self.inserted + self.matched
2016 }
2017
2018 pub fn add_batch(&mut self, counts: &BatchCounts) {
2022 self.sessions_inserted += counts.sessions_inserted;
2023 self.sessions_matched += counts.sessions_matched;
2024 self.messages_inserted_total += counts.messages_inserted_total;
2025 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2026 self.messages_matched_total += counts.messages_matched_total;
2027 self.messages_matched_searchable += counts.messages_matched_searchable;
2028 self.parts_inserted += counts.parts_inserted;
2029 self.parts_matched += counts.parts_matched;
2030 self.inserted +=
2031 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2032 self.matched +=
2033 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2034 }
2035
2036 pub fn merge(&mut self, other: &Self) {
2040 self.inserted += other.inserted;
2041 self.matched += other.matched;
2042 self.sessions_inserted += other.sessions_inserted;
2043 self.messages_inserted_total += other.messages_inserted_total;
2044 self.messages_inserted_searchable += other.messages_inserted_searchable;
2045 self.parts_inserted += other.parts_inserted;
2046 self.sessions_matched += other.sessions_matched;
2047 self.messages_matched_total += other.messages_matched_total;
2048 self.messages_matched_searchable += other.messages_matched_searchable;
2049 self.parts_matched += other.parts_matched;
2050 self.dropped_events += other.dropped_events;
2051 self.dropped_sessions += other.dropped_sessions;
2052 self.skipped_files += other.skipped_files;
2053 self.skipped_empty += other.skipped_empty;
2054 self.skipped_fresh += other.skipped_fresh;
2055 self.storage_errors += other.storage_errors;
2056 self.truncated_values += other.truncated_values;
2057 for (key, value) in &other.drop_reasons {
2058 *self.drop_reasons.entry(key).or_insert(0) += value;
2059 }
2060 }
2061
2062 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2067 for outcome in outcomes {
2068 if !matches!(outcome.status, OutcomeStatus::Error) {
2069 continue;
2070 }
2071 if outcome.kind == "session" {
2072 self.dropped_sessions += 1;
2073 } else {
2074 self.dropped_events += 1;
2075 }
2076 let reason = outcome
2077 .error
2078 .as_ref()
2079 .and_then(|error| error.reason_key)
2080 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2081 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2082 }
2083 }
2084
2085 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2086 for outcome in outcomes {
2087 match outcome.status {
2088 OutcomeStatus::Inserted => {
2089 self.inserted += 1;
2090 match outcome.kind {
2091 "session" => self.sessions_inserted += 1,
2092 "message" => {
2093 self.messages_inserted_total += 1;
2094 if outcome.searchable {
2095 self.messages_inserted_searchable += 1;
2096 }
2097 }
2098 "part" => self.parts_inserted += 1,
2099 _ => {}
2100 }
2101 }
2102 OutcomeStatus::Matched => {
2103 self.matched += 1;
2104 match outcome.kind {
2105 "session" => self.sessions_matched += 1,
2106 "message" => {
2107 self.messages_matched_total += 1;
2108 if outcome.searchable {
2109 self.messages_matched_searchable += 1;
2110 }
2111 }
2112 "part" => self.parts_matched += 1,
2113 _ => {}
2114 }
2115 }
2116 OutcomeStatus::Error => {
2117 if outcome.kind == "session" {
2123 self.dropped_sessions += 1;
2124 } else {
2125 self.dropped_events += 1;
2126 }
2127 let reason = outcome
2128 .error
2129 .as_ref()
2130 .and_then(|e| e.reason_key)
2131 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2132 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2133 }
2134 }
2135 }
2136 }
2137}
2138
2139#[derive(Debug, Clone, PartialEq)]
2144pub struct RowOutcome {
2145 pub index: usize,
2146 pub kind: &'static str,
2147 pub pk: Value,
2148 pub status: OutcomeStatus,
2149 pub error: Option<RowError>,
2150 pub searchable: bool,
2155}
2156
2157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2158pub enum OutcomeStatus {
2159 Inserted,
2160 Matched,
2161 Error,
2162}
2163
2164#[derive(Debug, Clone, PartialEq, Eq)]
2167pub struct RowError {
2168 pub message: String,
2169 pub field: Option<&'static str>,
2170 pub reason: Option<&'static str>,
2171 pub reason_key: Option<&'static str>,
2176}
2177
2178#[derive(Debug)]
2182struct BufferedSession {
2183 index: usize,
2184 session: Session,
2185}
2186
2187#[derive(Debug)]
2188struct BufferedMessage {
2189 index: usize,
2190 message: Message,
2191 parts: Vec<BufferedPart>,
2192 search_text: Option<String>,
2193}
2194
2195#[derive(Debug)]
2196struct BufferedPart {
2197 index: usize,
2198 part: Part,
2199}
2200
2201#[derive(Debug, Default)]
2218pub struct IngestValidator {
2219 session: Option<BufferedSession>,
2220 current_message: Option<BufferedMessage>,
2221 current_parts: Vec<BufferedPart>,
2222 messages: Vec<BufferedMessage>,
2223 seen_message_ids: HashSet<String>,
2227 seen_part_keys: HashSet<(String, String)>,
2230 completed: Vec<CompletedSubstream>,
2234}
2235
2236#[derive(Debug)]
2238struct CompletedSubstream {
2239 session_index: usize,
2240 session: Session,
2241 messages: Vec<BufferedMessage>,
2242}
2243
2244fn ingest_host_stamp() -> Option<&'static Value> {
2249 static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2250 STAMP
2251 .get_or_init(|| {
2252 let mut host = serde_json::Map::new();
2253 if let Ok(username) = whoami::username() {
2254 host.insert("username".to_owned(), username.into());
2255 }
2256 if let Ok(hostname) = whoami::hostname() {
2257 host.insert("hostname".to_owned(), hostname.into());
2258 }
2259 if let Ok(devicename) = whoami::devicename() {
2260 host.insert("device_name".to_owned(), devicename.into());
2261 }
2262 (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2263 })
2264 .as_ref()
2265}
2266
2267impl IngestValidator {
2268 pub async fn push(
2274 &mut self,
2275 store: &Store,
2276 index: usize,
2277 event: IngestEvent,
2278 ) -> Result<Vec<RowOutcome>> {
2279 match event {
2280 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2281 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2282 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2283 }
2284 }
2285
2286 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2291 self.close_current_substream();
2292 self.flush(store).await
2293 }
2294
2295 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2302 if self.completed.is_empty() {
2303 return Ok((Vec::new(), BatchCounts::default()));
2304 }
2305 let completed = std::mem::take(&mut self.completed);
2306 store.upsert_session_batch(completed).await
2307 }
2308
2309 pub fn pending_substreams(&self) -> usize {
2312 self.completed.len()
2313 }
2314
2315 async fn push_session(
2316 &mut self,
2317 _store: &Store,
2318 index: usize,
2319 mut session: Session,
2320 ) -> Result<Vec<RowOutcome>> {
2321 self.close_current_substream();
2325
2326 let trimmed = session.source_agent.trim();
2331 if trimmed.is_empty() {
2332 return Ok(vec![RowOutcome {
2333 index,
2334 kind: "session",
2335 pk: Value::String(session.id.clone()),
2336 status: OutcomeStatus::Error,
2337 error: Some(RowError {
2338 message: format!("session {} has empty source_agent after trim", session.id),
2339 field: Some("source_agent"),
2340 reason: None,
2341 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2342 }),
2343 searchable: false,
2344 }]);
2345 }
2346 if trimmed.len() != session.source_agent.len() {
2347 session.source_agent = trimmed.to_owned();
2348 }
2349
2350 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2351 return Ok(vec![RowOutcome {
2352 index,
2353 kind: "session",
2354 pk: Value::String(session.id.clone()),
2355 status: OutcomeStatus::Error,
2356 error: Some(RowError {
2357 message: format!(
2358 "session {} has parent_message_id without parent_session_id",
2359 session.id,
2360 ),
2361 field: Some("parent_message_id"),
2362 reason: None,
2363 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2364 }),
2365 searchable: false,
2366 }]);
2367 }
2368
2369 self.seen_message_ids.clear();
2370 self.seen_part_keys.clear();
2371 self.session = Some(BufferedSession { index, session });
2372 Ok(Vec::new())
2373 }
2374
2375 fn close_current_substream(&mut self) {
2376 self.flush_current_message();
2377 let Some(BufferedSession {
2378 index: session_index,
2379 session,
2380 }) = self.session.take()
2381 else {
2382 return;
2383 };
2384 let messages = std::mem::take(&mut self.messages);
2385 self.seen_message_ids.clear();
2386 self.seen_part_keys.clear();
2387 self.completed.push(CompletedSubstream {
2388 session_index,
2389 session,
2390 messages,
2391 });
2392 }
2393
2394 fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2395 let pk = Value::Array(vec![
2396 Value::String(message.session_id().to_owned()),
2397 Value::String(message.id().to_owned()),
2398 ]);
2399 let Some(session) = &self.session else {
2400 return vec![error_outcome(
2401 index,
2402 "message",
2403 pk,
2404 "first event in a session stream must be Session",
2405 None,
2406 DROP_REASON_MESSAGE_BEFORE_SESSION,
2407 )];
2408 };
2409 if message.session_id() != session.session.id {
2410 let msg = format!(
2411 "message {} references session {}, expected {}",
2412 message.id(),
2413 message.session_id(),
2414 session.session.id
2415 );
2416 return vec![error_outcome(
2417 index,
2418 "message",
2419 pk,
2420 &msg,
2421 Some("session_id"),
2422 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2423 )];
2424 }
2425 if !self.seen_message_ids.insert(message.id().to_owned()) {
2426 let msg = format!("duplicate message id {} in session substream", message.id());
2430 return vec![error_outcome(
2431 index,
2432 "message",
2433 pk,
2434 &msg,
2435 None,
2436 DROP_REASON_DUPLICATE_MESSAGE_ID,
2437 )];
2438 }
2439 match ingest_host_stamp() {
2444 Some(stamp) => {
2445 message
2446 .options_mut()
2447 .insert("pond".to_owned(), stamp.clone());
2448 }
2449 None => {
2450 message.options_mut().remove("pond");
2451 }
2452 }
2453 self.flush_current_message();
2454 self.current_message = Some(BufferedMessage {
2455 index,
2456 message,
2457 parts: Vec::new(),
2458 search_text: None,
2459 });
2460 Vec::new()
2461 }
2462
2463 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2464 let pk = Value::Array(vec![
2465 Value::String(part.session_id.clone()),
2466 Value::String(part.message_id.clone()),
2467 Value::String(part.id.clone()),
2468 ]);
2469 let Some(current) = &self.current_message else {
2470 return vec![error_outcome(
2471 index,
2472 "part",
2473 pk,
2474 "part event appeared before a message",
2475 None,
2476 DROP_REASON_PART_BEFORE_MESSAGE,
2477 )];
2478 };
2479 if part.session_id != current.message.session_id() {
2480 let msg = format!(
2481 "part {} references session {}, expected {}",
2482 part.id,
2483 part.session_id,
2484 current.message.session_id()
2485 );
2486 return vec![error_outcome(
2487 index,
2488 "part",
2489 pk,
2490 &msg,
2491 Some("session_id"),
2492 DROP_REASON_PART_MESSAGE_MISMATCH,
2493 )];
2494 }
2495 if part.message_id != current.message.id() {
2496 let msg = format!(
2497 "part {} references message {}, expected {}",
2498 part.id,
2499 part.message_id,
2500 current.message.id()
2501 );
2502 return vec![error_outcome(
2503 index,
2504 "part",
2505 pk,
2506 &msg,
2507 Some("message_id"),
2508 DROP_REASON_PART_MESSAGE_MISMATCH,
2509 )];
2510 }
2511 let part_key = (part.message_id.clone(), part.id.clone());
2512 if !self.seen_part_keys.insert(part_key) {
2513 let msg = format!(
2514 "duplicate part id {} for message {} in session substream",
2515 part.id, part.message_id
2516 );
2517 return vec![error_outcome(
2518 index,
2519 "part",
2520 pk,
2521 &msg,
2522 None,
2523 DROP_REASON_DUPLICATE_PART_KEY,
2524 )];
2525 }
2526 self.current_parts.push(BufferedPart { index, part });
2527 Vec::new()
2528 }
2529
2530 fn flush_current_message(&mut self) {
2531 let Some(mut buffered) = self.current_message.take() else {
2532 return;
2533 };
2534 let parts = std::mem::take(&mut self.current_parts);
2535 let mut canonical_parts = Vec::with_capacity(parts.len());
2536 for part in &parts {
2537 canonical_parts.push(part.part.clone());
2538 }
2539 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2540 buffered.parts = parts;
2541 self.messages.push(buffered);
2542 }
2543}
2544
2545fn error_outcome(
2546 index: usize,
2547 kind: &'static str,
2548 pk: Value,
2549 message: &str,
2550 field: Option<&'static str>,
2551 reason_key: &'static str,
2552) -> RowOutcome {
2553 RowOutcome {
2554 index,
2555 kind,
2556 pk,
2557 status: OutcomeStatus::Error,
2558 error: Some(RowError {
2559 message: message.to_owned(),
2560 field,
2561 reason: None,
2562 reason_key: Some(reason_key),
2563 }),
2564 searchable: false,
2565 }
2566}
2567
2568fn error_outcomes_for_substream(
2573 session_index: usize,
2574 session: &Session,
2575 _messages: &[BufferedMessage],
2576 message: impl Into<String>,
2577 field: Option<&'static str>,
2578 reason_key: &'static str,
2579) -> Vec<RowOutcome> {
2580 let reason = field.map(|_| "immutable");
2581 vec![RowOutcome {
2582 index: session_index,
2583 kind: "session",
2584 pk: Value::String(session.id.clone()),
2585 status: OutcomeStatus::Error,
2586 error: Some(RowError {
2587 message: message.into(),
2588 field,
2589 reason,
2590 reason_key: Some(reason_key),
2591 }),
2592 searchable: false,
2593 }]
2594}
2595
2596fn success_outcomes_for_substream(
2602 session_index: usize,
2603 session: &Session,
2604 messages: &[BufferedMessage],
2605 existing_sessions: &std::collections::HashMap<String, Session>,
2606 existing_message_pks: &HashSet<(String, String)>,
2607 existing_part_pks: &HashSet<(String, String, String)>,
2608 counts: &mut BatchCounts,
2609) -> Vec<RowOutcome> {
2610 let session_was_present = existing_sessions.contains_key(&session.id);
2611 let session_status = if session_was_present {
2612 counts.sessions_matched += 1;
2613 UpsertStatus::Matched
2614 } else {
2615 counts.sessions_inserted += 1;
2616 UpsertStatus::Inserted
2617 };
2618
2619 let mut outcomes = Vec::with_capacity(1 + messages.len());
2620 outcomes.push(success_outcome(
2621 session_index,
2622 "session",
2623 Value::String(session.id.clone()),
2624 session_status,
2625 false,
2626 ));
2627 for buffered in messages {
2628 let key = (
2629 buffered.message.session_id().to_owned(),
2630 buffered.message.id().to_owned(),
2631 );
2632 let searchable = buffered.search_text.is_some();
2633 let message_status = if existing_message_pks.contains(&key) {
2634 counts.messages_matched_total += 1;
2635 if searchable {
2636 counts.messages_matched_searchable += 1;
2637 }
2638 UpsertStatus::Matched
2639 } else {
2640 counts.messages_inserted_total += 1;
2641 if searchable {
2642 counts.messages_inserted_searchable += 1;
2643 }
2644 UpsertStatus::Inserted
2645 };
2646 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2647 outcomes.push(success_outcome(
2648 buffered.index,
2649 "message",
2650 pk,
2651 message_status,
2652 searchable,
2653 ));
2654 for part in &buffered.parts {
2655 let part_key = (
2656 part.part.session_id.clone(),
2657 part.part.message_id.clone(),
2658 part.part.id.clone(),
2659 );
2660 let part_status = if existing_part_pks.contains(&part_key) {
2661 counts.parts_matched += 1;
2662 UpsertStatus::Matched
2663 } else {
2664 counts.parts_inserted += 1;
2665 UpsertStatus::Inserted
2666 };
2667 let part_pk = Value::Array(vec![
2668 Value::String(part_key.0),
2669 Value::String(part_key.1),
2670 Value::String(part_key.2),
2671 ]);
2672 outcomes.push(success_outcome(
2673 part.index,
2674 "part",
2675 part_pk,
2676 part_status,
2677 false,
2678 ));
2679 }
2680 }
2681 outcomes
2682}
2683
2684fn success_outcome(
2685 index: usize,
2686 kind: &'static str,
2687 pk: Value,
2688 status: UpsertStatus,
2689 searchable: bool,
2690) -> RowOutcome {
2691 let status = match status {
2692 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2693 UpsertStatus::Matched => OutcomeStatus::Matched,
2694 };
2695 RowOutcome {
2696 index,
2697 kind,
2698 pk,
2699 status,
2700 error: None,
2701 searchable,
2702 }
2703}
2704
2705#[derive(Debug, Clone, PartialEq, Eq)]
2706enum IngestError {
2707 ImmutableField {
2712 field: &'static str,
2713 session_id: String,
2714 stored: String,
2715 attempted: String,
2716 },
2717}
2718
2719impl std::fmt::Display for IngestError {
2720 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2721 match self {
2722 Self::ImmutableField {
2723 field,
2724 session_id,
2725 stored,
2726 attempted,
2727 } => write!(
2728 formatter,
2729 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2730 ),
2731 }
2732 }
2733}
2734
2735impl std::error::Error for IngestError {}
2736
2737fn ensure_immutable_match(
2741 existing: &Session,
2742 incoming: &Session,
2743) -> std::result::Result<(), IngestError> {
2744 if existing.source_agent != incoming.source_agent {
2745 return Err(IngestError::ImmutableField {
2746 field: "source_agent",
2747 session_id: incoming.id.clone(),
2748 stored: existing.source_agent.clone(),
2749 attempted: incoming.source_agent.clone(),
2750 });
2751 }
2752 if existing.project != incoming.project {
2753 return Err(IngestError::ImmutableField {
2754 field: "project",
2755 session_id: incoming.id.clone(),
2756 stored: (*existing.project).clone(),
2757 attempted: (*incoming.project).clone(),
2758 });
2759 }
2760 Ok(())
2761}
2762
2763pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2764 use crate::wire::Provenance;
2765 let mut chunks: Vec<String> = Vec::new();
2766 for part in parts {
2767 if part.provenance != Provenance::Conversational {
2770 continue;
2771 }
2772 match (message.role(), &part.kind) {
2773 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2774 if let Some(text) = text {
2775 chunks.push(text.to_string());
2776 }
2777 }
2778 (
2779 Role::User | Role::Assistant,
2780 PartKind::File {
2781 media_type,
2782 file_name,
2783 data,
2784 },
2785 ) => {
2786 if let Some(file_name) = file_name {
2787 chunks.push(file_name.clone());
2788 }
2789 if let Some(media_type) = media_type {
2790 chunks.push(media_type.clone());
2791 }
2792 if let FileData::Url(uri) = data {
2793 chunks.push(uri.clone());
2794 }
2795 }
2796 (
2797 Role::System | Role::Tool,
2798 PartKind::Text { .. }
2799 | PartKind::Reasoning { .. }
2800 | PartKind::File { .. }
2801 | PartKind::ToolCall { .. }
2802 | PartKind::ToolResult { .. }
2803 | PartKind::ToolApprovalRequest { .. }
2804 | PartKind::ToolApprovalResponse { .. },
2805 )
2806 | (
2807 Role::User | Role::Assistant,
2808 PartKind::Reasoning { .. }
2809 | PartKind::ToolCall { .. }
2810 | PartKind::ToolResult { .. }
2811 | PartKind::ToolApprovalRequest { .. }
2812 | PartKind::ToolApprovalResponse { .. },
2813 ) => {}
2814 }
2815 }
2816
2817 let text = chunks
2818 .into_iter()
2819 .filter(|chunk| !chunk.trim().is_empty())
2820 .collect::<Vec<_>>()
2821 .join("\n");
2822 if text.is_empty() { None } else { Some(text) }
2823}
2824
2825#[derive(Debug, Clone, PartialEq, Eq)]
2827pub struct SearchText(String);
2828
2829impl SearchText {
2830 pub fn as_str(&self) -> &str {
2831 &self.0
2832 }
2833
2834 pub fn into_inner(self) -> String {
2835 self.0
2836 }
2837}
2838
2839impl AsRef<str> for SearchText {
2840 fn as_ref(&self) -> &str {
2841 &self.0
2842 }
2843}
2844
2845#[derive(Debug, Clone, PartialEq)]
2846pub struct MessageWithParts {
2847 pub message: Message,
2848 pub parts: Vec<Part>,
2849}
2850
2851#[derive(Debug, Clone, PartialEq)]
2852pub struct SessionWithMessages {
2853 pub session: Session,
2854 pub messages: Vec<MessageWithParts>,
2855}
2856
2857#[derive(Debug, Clone)]
2858pub struct SessionViewParams<'a> {
2859 pub mode: ResponseMode,
2860 pub after_id: Option<&'a str>,
2861 pub limit: usize,
2862 pub budget_bytes: usize,
2863 pub session_from: SessionFrom,
2864}
2865
2866#[derive(Debug, Clone)]
2867pub struct MessageViewParams<'a> {
2868 pub context_depth: usize,
2869 pub mode: ResponseMode,
2873 pub after_id: Option<&'a str>,
2874 pub limit: usize,
2875 pub budget_bytes: usize,
2876}
2877
2878#[derive(Debug, Clone, PartialEq)]
2884pub enum GetLookup<T> {
2885 NotFound,
2886 UnknownAfterId,
2887 Found(T),
2888}
2889
2890#[derive(Debug, Clone, PartialEq)]
2894pub struct SessionPage {
2895 pub session: Session,
2896 pub messages: Vec<RetrievedMessage>,
2897 pub messages_remaining: usize,
2898}
2899
2900#[derive(Debug, Clone, PartialEq)]
2904pub struct MessagePage {
2905 pub session: Session,
2906 pub target: RetrievedMessage,
2907 pub target_parts: Vec<Part>,
2908 pub target_parts_remaining: usize,
2909 pub siblings: Vec<RetrievedMessage>,
2910}
2911
2912#[derive(Debug, Clone, PartialEq)]
2913pub struct RetrievedMessage {
2914 pub id: String,
2915 pub role: Role,
2916 pub timestamp: DateTime<Utc>,
2917 pub text: Option<String>,
2918 pub content: Option<String>,
2919 pub parts: Vec<Part>,
2920}
2921
2922#[derive(Debug, Clone)]
2923struct ScanRow {
2924 id: String,
2925 role: Role,
2926 timestamp: DateTime<Utc>,
2927 text: Option<String>,
2928 content: Option<String>,
2929}
2930
2931#[derive(Debug, Clone)]
2934pub struct ConversationalRow {
2935 pub session_id: String,
2936 pub message_id: String,
2937 pub role: Role,
2938 pub timestamp: DateTime<Utc>,
2939 pub text: SearchText,
2940}
2941
2942fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2947 let capped = items.len().min(limit.clamp(1, 1000));
2948 let mut acc = 0usize;
2949 let mut emitted = 0usize;
2950 for item in &items[..capped] {
2951 let next = acc.saturating_add(size(item));
2952 if emitted > 0 && next > budget_bytes {
2953 break;
2954 }
2955 acc = next;
2956 emitted += 1;
2957 }
2958 emitted
2959}
2960
2961fn role_from_str(value: &str) -> Result<Role> {
2962 match value {
2963 "system" => Ok(Role::System),
2964 "user" => Ok(Role::User),
2965 "assistant" => Ok(Role::Assistant),
2966 "tool" => Ok(Role::Tool),
2967 other => anyhow::bail!("unknown message role {other}"),
2968 }
2969}
2970
2971const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2979 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2980 (
2981 "session_id",
2982 BuiltinIndexType::BTree,
2983 "messages_session_id_btree",
2984 ),
2985 (
2986 "timestamp",
2987 BuiltinIndexType::BTree,
2988 "messages_timestamp_btree",
2989 ),
2990 (
2991 "source_agent",
2992 BuiltinIndexType::Bitmap,
2993 "messages_source_agent_bitmap",
2994 ),
2995 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2996];
2997
2998const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3001 (
3002 "session_id",
3003 BuiltinIndexType::BTree,
3004 "parts_session_id_btree",
3005 ),
3006 (
3007 "message_id",
3008 BuiltinIndexType::BTree,
3009 "parts_message_id_btree",
3010 ),
3011];
3012
3013const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3016 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3017
3018fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3019 Predicate::In(
3020 column,
3021 values.iter().cloned().map(ScalarValue::String).collect(),
3022 )
3023}
3024
3025fn embedded_scope(filter: &Predicate) -> Predicate {
3030 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3031}
3032
3033fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3034 let inserted = usize::try_from(inserted_rows)
3035 .unwrap_or(usize::MAX)
3036 .min(total);
3037 let mut statuses = Vec::with_capacity(total);
3038 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3039 statuses.extend(std::iter::repeat_n(
3040 UpsertStatus::Matched,
3041 total.saturating_sub(inserted),
3042 ));
3043 statuses
3044}
3045
3046pub(crate) const SESSIONS: &str = "sessions";
3050pub(crate) const MESSAGES: &str = "messages";
3051pub(crate) const PARTS: &str = "parts";
3052
3053pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3056
3057pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3060
3061const IVF_PQ_NUM_BITS: u8 = 8;
3067const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3068const IVF_PQ_MAX_ITERS: usize = 15;
3069
3070const FTS_NGRAM_MIN: u32 = 3;
3074const FTS_NGRAM_MAX: u32 = 5;
3075
3076pub fn pond_index_intents() -> IndexIntents {
3079 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3080}
3081
3082pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3086 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3087 messages.push(IndexIntent {
3088 name: MESSAGES_FTS_INDEX,
3089 column: "search_text",
3090 trigger: IndexTrigger::OnAnyRows,
3091 params: IndexParamsKind::InvertedFtsNgram {
3092 min: FTS_NGRAM_MIN,
3093 max: FTS_NGRAM_MAX,
3094 },
3095 });
3096 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3097 messages.push(IndexIntent {
3098 name,
3099 column,
3100 trigger: IndexTrigger::OnAnyRows,
3101 params: IndexParamsKind::Scalar(kind.clone()),
3102 });
3103 }
3104 messages.push(IndexIntent {
3105 name: MESSAGES_VECTOR_INDEX,
3106 column: "vector",
3107 trigger: IndexTrigger::OnNonNullCount {
3108 column: "vector",
3109 threshold: vector_threshold,
3110 },
3111 params: IndexParamsKind::IvfPqCosine {
3112 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3113 num_bits: IVF_PQ_NUM_BITS,
3114 max_iters: IVF_PQ_MAX_ITERS,
3115 },
3116 });
3117 let parts = PARTS_SCALAR_INDICES
3118 .iter()
3119 .map(|(column, kind, name)| IndexIntent {
3120 name,
3121 column,
3122 trigger: IndexTrigger::OnAnyRows,
3123 params: IndexParamsKind::Scalar(kind.clone()),
3124 })
3125 .collect();
3126 let sessions = SESSIONS_SCALAR_INDICES
3127 .iter()
3128 .map(|(column, kind, name)| IndexIntent {
3129 name,
3130 column,
3131 trigger: IndexTrigger::OnAnyRows,
3132 params: IndexParamsKind::Scalar(kind.clone()),
3133 })
3134 .collect();
3135 IndexIntents {
3136 sessions,
3137 messages,
3138 parts,
3139 }
3140}
3141
3142pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3146
3147static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3153
3154pub fn embedding_dim() -> usize {
3157 EMBEDDING_DIM_RUNTIME
3158 .get()
3159 .copied()
3160 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3161}
3162
3163pub fn init_embedding_dim(dim: usize) {
3165 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3166}
3167
3168pub(crate) fn write_params_for_create() -> WriteParams {
3175 WriteParams {
3176 data_storage_version: Some(LanceFileVersion::V2_1),
3177 enable_v2_manifest_paths: true,
3178 enable_stable_row_ids: true,
3179 auto_cleanup: Some(AutoCleanupParams {
3180 interval: 20,
3181 older_than: chrono::TimeDelta::days(1),
3182 }),
3183 skip_auto_cleanup: true,
3184 ..WriteParams::default()
3185 }
3186}
3187
3188fn export_schema(table: Table) -> Arc<Schema> {
3189 match table {
3190 Table::Sessions => session_schema(),
3191 Table::Messages => message_schema(),
3192 Table::Parts => part_schema(),
3193 }
3194}
3195
3196fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3197 let expected = export_schema(table);
3198 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3199 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3200 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3201 if actual_names != expected_names {
3202 anyhow::bail!(
3203 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3204 table.as_str(),
3205 );
3206 }
3207 Ok(())
3208}
3209
3210async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3211 let source_uri = source
3212 .to_str()
3213 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3214 let dataset = Dataset::open(source_uri)
3215 .await
3216 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3217 ensure_schema_matches_archive(&dataset, table)?;
3218 Ok(dataset)
3219}
3220
3221pub(crate) fn session_schema() -> Arc<Schema> {
3222 Arc::new(Schema::new(vec![
3223 primary_field("id", DataType::Utf8, false),
3224 Field::new("parent_session_id", DataType::Utf8, true),
3225 Field::new("parent_message_id", DataType::Utf8, true),
3226 Field::new("source_agent", DataType::Utf8, false),
3227 Field::new(
3228 "created_at",
3229 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3230 false,
3231 ),
3232 Field::new("project", DataType::Utf8, false),
3233 json_field("options", false),
3234 ]))
3235}
3236
3237pub(crate) fn message_schema() -> Arc<Schema> {
3238 Arc::new(Schema::new(vec![
3239 primary_field("session_id", DataType::Utf8, false),
3240 primary_field("id", DataType::Utf8, false),
3241 Field::new(
3242 "timestamp",
3243 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3244 false,
3245 ),
3246 Field::new("role", DataType::Utf8, false),
3247 Field::new("source_agent", DataType::Utf8, false),
3248 Field::new("project", DataType::Utf8, false),
3249 Field::new("content", DataType::Utf8, true),
3250 Field::new("search_text", DataType::Utf8, true),
3251 Field::new("vector", embedding_vector_type(), true),
3254 Field::new("embedding_model", DataType::Utf8, true),
3255 json_field("options", false),
3256 ]))
3257}
3258
3259pub(crate) fn part_schema() -> Arc<Schema> {
3260 Arc::new(Schema::new(vec![
3261 primary_field("session_id", DataType::Utf8, false),
3262 primary_field("message_id", DataType::Utf8, false),
3263 primary_field("id", DataType::Utf8, false),
3264 Field::new("ordinal", DataType::Int32, false),
3265 Field::new("type", DataType::Utf8, false),
3266 Field::new("provenance", DataType::Utf8, false),
3269 json_field("variant_data", false),
3270 legacy_blob_field("data", true),
3271 json_field("options", false),
3272 ]))
3273}
3274
3275pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3276 let arrays = schema
3277 .fields()
3278 .iter()
3279 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3280 .collect();
3281 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3282}
3283
3284pub(crate) fn empty_reader(
3285 schema: Arc<Schema>,
3286) -> Result<
3287 RecordBatchIterator<
3288 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3289 >,
3290> {
3291 let batch = empty_batch(schema.clone())?;
3292 Ok(RecordBatchIterator::new(
3293 vec![Ok(batch)].into_iter(),
3294 schema,
3295 ))
3296}
3297
3298pub(crate) struct MessageBatchRow<'a> {
3299 pub message: &'a Message,
3300 pub source_agent: &'a str,
3301 pub project: &'a str,
3302 pub search_text: Option<&'a str>,
3303}
3304
3305fn embedding_vector_type() -> DataType {
3311 DataType::FixedSizeList(
3312 Arc::new(Field::new("item", DataType::Float16, true)),
3313 embedding_dim() as i32,
3314 )
3315}
3316
3317fn embedding_update_schema() -> Arc<Schema> {
3321 Arc::new(Schema::new(vec![
3322 primary_field("session_id", DataType::Utf8, false),
3323 primary_field("id", DataType::Utf8, false),
3324 Field::new("vector", embedding_vector_type(), true),
3325 Field::new("embedding_model", DataType::Utf8, true),
3326 ]))
3327}
3328
3329pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3332 let dim = embedding_dim();
3333 let mut flat = Vec::with_capacity(rows.len() * dim);
3334 for row in rows {
3335 if row.vector.len() != dim {
3336 anyhow::bail!(
3337 "embedding for message {} has dim {}, expected {dim}",
3338 row.id,
3339 row.vector.len(),
3340 );
3341 }
3342 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3343 }
3344 let values = Float16Array::from(flat);
3345 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3346 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3347 .context("failed to build embedding vector column")?;
3348
3349 RecordBatch::try_new(
3350 embedding_update_schema(),
3351 vec![
3352 Arc::new(StringArray::from(
3353 rows.iter()
3354 .map(|row| row.session_id.as_str())
3355 .collect::<Vec<_>>(),
3356 )),
3357 Arc::new(StringArray::from(
3358 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3359 )),
3360 Arc::new(vectors),
3361 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3362 ],
3363 )
3364 .context("failed to build embedding update batch")
3365}
3366
3367const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3372
3373fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3378 let mut chunks = Vec::new();
3379 let mut start = 0usize;
3380 let mut running = 0usize;
3381 for (index, &row) in cells.iter().enumerate() {
3382 if running + row > COLUMN_BYTE_BUDGET && index > start {
3383 chunks.push(start..index);
3384 start = index;
3385 running = 0;
3386 }
3387 running += row;
3388 }
3389 if start < cells.len() {
3390 chunks.push(start..cells.len());
3391 }
3392 chunks
3393}
3394
3395fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3396 if bytes >= COLUMN_BYTE_BUDGET {
3397 anyhow::bail!(
3398 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3399 overflow Arrow's i32 offset buffer"
3400 );
3401 }
3402 Ok(())
3403}
3404
3405async fn merge_insert_chunks(
3406 handle: &Handle,
3407 table: Table,
3408 batches: Vec<RecordBatch>,
3409) -> Result<u64> {
3410 let mut inserted = 0u64;
3411 for batch in batches {
3412 let rows = batch.num_rows();
3413 inserted += handle.merge_insert(table, batch, rows).await?;
3414 }
3415 Ok(inserted)
3416}
3417
3418pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3419 let options = sessions
3420 .iter()
3421 .map(|session| json_bytes(&session.options))
3422 .collect::<Result<Vec<_>>>()?;
3423 let mut cells = Vec::with_capacity(sessions.len());
3424 for (session, encoded) in sessions.iter().zip(&options) {
3425 let columns = [
3426 session.id.len(),
3427 session.parent_session_id.as_deref().map_or(0, str::len),
3428 session.parent_message_id.as_deref().map_or(0, str::len),
3429 session.source_agent.len(),
3430 session.project.as_str().len(),
3431 encoded.len(),
3432 ];
3433 for bytes in columns {
3434 guard_cell("sessions", &session.id, bytes)?;
3435 }
3436 cells.push(columns.iter().sum());
3437 }
3438 chunk_ranges(&cells)
3439 .into_iter()
3440 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3441 .collect()
3442}
3443
3444fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3445 let schema = session_schema();
3446 RecordBatch::try_new(
3447 schema.clone(),
3448 vec![
3449 Arc::new(StringArray::from(
3450 sessions
3451 .iter()
3452 .map(|session| session.id.as_str())
3453 .collect::<Vec<_>>(),
3454 )),
3455 Arc::new(StringArray::from(
3456 sessions
3457 .iter()
3458 .map(|session| session.parent_session_id.as_deref())
3459 .collect::<Vec<_>>(),
3460 )),
3461 Arc::new(StringArray::from(
3462 sessions
3463 .iter()
3464 .map(|session| session.parent_message_id.as_deref())
3465 .collect::<Vec<_>>(),
3466 )),
3467 Arc::new(StringArray::from(
3468 sessions
3469 .iter()
3470 .map(|session| session.source_agent.as_str())
3471 .collect::<Vec<_>>(),
3472 )),
3473 Arc::new(
3474 TimestampMicrosecondArray::from(
3475 sessions
3476 .iter()
3477 .map(|session| micros(session.created_at))
3478 .collect::<Vec<_>>(),
3479 )
3480 .with_timezone("UTC"),
3481 ),
3482 Arc::new(StringArray::from(
3483 sessions
3484 .iter()
3485 .map(|session| session.project.as_str())
3486 .collect::<Vec<_>>(),
3487 )),
3488 Arc::new(LargeBinaryArray::from_iter_values(
3489 options.iter().map(Vec::as_slice),
3490 )),
3491 ],
3492 )
3493 .context("failed to build session batch")
3494}
3495
3496pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3497 let options = rows
3498 .iter()
3499 .map(|row| json_bytes(row.message.options()))
3500 .collect::<Result<Vec<_>>>()?;
3501 let mut cells = Vec::with_capacity(rows.len());
3502 for (row, encoded) in rows.iter().zip(&options) {
3503 let columns = [
3504 row.message.session_id().len(),
3505 row.message.id().len(),
3506 row.message.role().as_str().len(),
3507 row.source_agent.len(),
3508 row.project.len(),
3509 row.message.system_content().map_or(0, str::len),
3510 row.search_text.map_or(0, str::len),
3511 encoded.len(),
3512 ];
3513 for bytes in columns {
3514 guard_cell("messages", row.message.id(), bytes)?;
3515 }
3516 cells.push(columns.iter().sum());
3517 }
3518 chunk_ranges(&cells)
3519 .into_iter()
3520 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3521 .collect()
3522}
3523
3524fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3525 let schema = message_schema();
3526 RecordBatch::try_new(
3527 schema.clone(),
3528 vec![
3529 Arc::new(StringArray::from(
3530 rows.iter()
3531 .map(|row| row.message.session_id())
3532 .collect::<Vec<_>>(),
3533 )),
3534 Arc::new(StringArray::from(
3535 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3536 )),
3537 Arc::new(
3538 TimestampMicrosecondArray::from(
3539 rows.iter()
3540 .map(|row| micros(row.message.timestamp()))
3541 .collect::<Vec<_>>(),
3542 )
3543 .with_timezone("UTC"),
3544 ),
3545 Arc::new(StringArray::from(
3546 rows.iter()
3547 .map(|row| row.message.role().as_str())
3548 .collect::<Vec<_>>(),
3549 )),
3550 Arc::new(StringArray::from(
3551 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3552 )),
3553 Arc::new(StringArray::from(
3554 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3555 )),
3556 Arc::new(StringArray::from(
3557 rows.iter()
3558 .map(|row| row.message.system_content())
3559 .collect::<Vec<_>>(),
3560 )),
3561 Arc::new(StringArray::from(
3562 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3563 )),
3564 new_null_array(&embedding_vector_type(), rows.len()),
3568 new_null_array(&DataType::Utf8, rows.len()),
3569 Arc::new(LargeBinaryArray::from_iter_values(
3570 options.iter().map(Vec::as_slice),
3571 )),
3572 ],
3573 )
3574 .context("failed to build message batch")
3575}
3576
3577pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3578 let variant_data = parts
3579 .iter()
3580 .map(|part| part_variant_json(&part.kind))
3581 .collect::<Result<Vec<_>>>()?;
3582 let options = parts
3583 .iter()
3584 .map(|part| json_bytes(&part.options))
3585 .collect::<Result<Vec<_>>>()?;
3586 let mut cells = Vec::with_capacity(parts.len());
3587 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3590 let columns = [
3591 part.session_id.len(),
3592 part.message_id.len(),
3593 part.id.len(),
3594 part.kind.type_name().len(),
3595 part.provenance.as_str().len(),
3596 variant.len(),
3597 encoded.len(),
3598 ];
3599 for bytes in columns {
3600 guard_cell("parts", &part.id, bytes)?;
3601 }
3602 cells.push(columns.iter().sum());
3603 }
3604 chunk_ranges(&cells)
3605 .into_iter()
3606 .map(|range| {
3607 parts_chunk(
3608 &parts[range.clone()],
3609 &variant_data[range.clone()],
3610 &options[range],
3611 )
3612 })
3613 .collect()
3614}
3615
3616fn parts_chunk(
3617 parts: &[Part],
3618 variant_data: &[Vec<u8>],
3619 options: &[Vec<u8>],
3620) -> Result<RecordBatch> {
3621 let schema = part_schema();
3622 let blob_payloads: Vec<Option<&[u8]>> = parts
3626 .iter()
3627 .map(|part| match &part.kind {
3628 PartKind::File { data, .. } => Some(match data {
3629 FileData::String(value) => value.as_bytes(),
3630 FileData::Bytes(value) => value.as_slice(),
3631 FileData::Url(value) => value.as_bytes(),
3632 }),
3633 PartKind::Text { .. }
3634 | PartKind::Reasoning { .. }
3635 | PartKind::ToolCall { .. }
3636 | PartKind::ToolResult { .. }
3637 | PartKind::ToolApprovalRequest { .. }
3638 | PartKind::ToolApprovalResponse { .. } => None,
3639 })
3640 .collect();
3641 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3642
3643 RecordBatch::try_new(
3644 schema.clone(),
3645 vec![
3646 Arc::new(StringArray::from(
3647 parts
3648 .iter()
3649 .map(|part| part.session_id.as_str())
3650 .collect::<Vec<_>>(),
3651 )),
3652 Arc::new(StringArray::from(
3653 parts
3654 .iter()
3655 .map(|part| part.message_id.as_str())
3656 .collect::<Vec<_>>(),
3657 )),
3658 Arc::new(StringArray::from(
3659 parts
3660 .iter()
3661 .map(|part| part.id.as_str())
3662 .collect::<Vec<_>>(),
3663 )),
3664 Arc::new(Int32Array::from(
3665 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3666 )),
3667 Arc::new(StringArray::from(
3668 parts
3669 .iter()
3670 .map(|part| part.kind.type_name())
3671 .collect::<Vec<_>>(),
3672 )),
3673 Arc::new(StringArray::from(
3674 parts
3675 .iter()
3676 .map(|part| part.provenance.as_str())
3677 .collect::<Vec<_>>(),
3678 )),
3679 Arc::new(LargeBinaryArray::from_iter_values(
3680 variant_data.iter().map(Vec::as_slice),
3681 )),
3682 Arc::new(blob_array),
3683 Arc::new(LargeBinaryArray::from_iter_values(
3684 options.iter().map(Vec::as_slice),
3685 )),
3686 ],
3687 )
3688 .context("failed to build parts batch")
3689}
3690
3691pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3692 Ok(Session {
3693 id: string(batch, "id", row)?.context("session id is null")?,
3694 parent_session_id: string(batch, "parent_session_id", row)?,
3695 parent_message_id: string(batch, "parent_message_id", row)?,
3696 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3697 created_at: datetime(batch, "created_at", row)?,
3698 project: crate::adapter::Extracted::from_stored(
3699 string(batch, "project", row)?.context("project is null")?,
3700 ),
3701 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3702 })
3703}
3704
3705pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3706 let id = string(batch, "id", row)?.context("message id is null")?;
3707 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3708 let timestamp = datetime(batch, "timestamp", row)?;
3709 let options =
3710 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3711
3712 match string(batch, "role", row)?
3713 .context("message role is null")?
3714 .as_str()
3715 {
3716 "system" => Ok(Message::System {
3717 id,
3718 session_id,
3719 timestamp,
3720 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3727 options,
3728 }),
3729 "user" => Ok(Message::User {
3730 id,
3731 session_id,
3732 timestamp,
3733 options,
3734 }),
3735 "assistant" => Ok(Message::Assistant {
3736 id,
3737 session_id,
3738 timestamp,
3739 options,
3740 }),
3741 "tool" => Ok(Message::Tool {
3742 id,
3743 session_id,
3744 timestamp,
3745 options,
3746 }),
3747 other => anyhow::bail!("unknown message role {other}"),
3748 }
3749}
3750
3751pub(crate) fn part_from_batch(
3752 batch: &RecordBatch,
3753 row: usize,
3754 file_data: Option<FileData>,
3755) -> Result<Part> {
3756 let type_name = string(batch, "type", row)?.context("part type is null")?;
3757 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3758 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3759 Ok(Part {
3760 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3761 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3762 id: string(batch, "id", row)?.context("part id is null")?,
3763 ordinal: int32(batch, "ordinal", row)?,
3764 provenance: provenance_from_str(&provenance)?,
3765 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3766 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3767 })
3768}
3769
3770fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3771 match value {
3772 "conversational" => Ok(crate::wire::Provenance::Conversational),
3773 "injected" => Ok(crate::wire::Provenance::Injected),
3774 other => anyhow::bail!("unknown part provenance {other}"),
3775 }
3776}
3777
3778fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3779 let kind = file_data_kind(variant_data)?;
3780 match kind.as_str() {
3781 "string" => {
3782 let text = std::str::from_utf8(bytes)
3783 .context("file string payload is not UTF-8")?
3784 .to_owned();
3785 Ok(FileData::String(text))
3786 }
3787 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3788 "url" => Ok(FileData::Url(
3789 std::str::from_utf8(bytes)
3790 .context("file URL payload is not UTF-8")?
3791 .to_owned(),
3792 )),
3793 other => anyhow::bail!("unknown file data_kind {other}"),
3794 }
3795}
3796
3797fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3798 let value = json_parse::<Value>(variant_data)?;
3799 value
3800 .get("data_kind")
3801 .and_then(Value::as_str)
3802 .map(str::to_owned)
3803 .context("file part variant_data missing data_kind")
3804}
3805
3806fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3807 batch
3808 .column_by_name(name)
3809 .with_context(|| format!("missing column {name}"))?
3810 .as_any()
3811 .downcast_ref::<UInt64Array>()
3812 .with_context(|| format!("column {name} is not UInt64"))
3813}
3814
3815pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3816 let array = batch
3817 .column_by_name(name)
3818 .with_context(|| format!("missing column {name}"))?
3819 .as_any()
3820 .downcast_ref::<StringArray>()
3821 .with_context(|| format!("column {name} is not Utf8"))?;
3822 if array.is_null(row) {
3823 Ok(None)
3824 } else {
3825 Ok(Some(array.value(row).to_owned()))
3826 }
3827}
3828
3829fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3830 let column = batch
3834 .column_by_name(name)
3835 .with_context(|| format!("missing column {name}"))?;
3836 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3837 return if array.is_null(row) {
3838 Ok(None)
3839 } else {
3840 Ok(Some(
3841 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3842 ))
3843 };
3844 }
3845 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3846 return if array.is_null(row) {
3847 Ok(None)
3848 } else {
3849 Ok(Some(array.value(row).as_bytes().to_vec()))
3850 };
3851 }
3852 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3853 return if array.is_null(row) {
3854 Ok(None)
3855 } else {
3856 Ok(Some(array.value(row).as_bytes().to_vec()))
3857 };
3858 }
3859 anyhow::bail!("column {name} is not a JSON-compatible array")
3860}
3861
3862fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3863 let array = batch
3864 .column_by_name(name)
3865 .with_context(|| format!("missing column {name}"))?
3866 .as_any()
3867 .downcast_ref::<Int32Array>()
3868 .with_context(|| format!("column {name} is not Int32"))?;
3869 Ok(array.value(row))
3870}
3871
3872pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3873 let array = batch
3874 .column_by_name(name)
3875 .with_context(|| format!("missing column {name}"))?
3876 .as_any()
3877 .downcast_ref::<Float32Array>()
3878 .with_context(|| format!("column {name} is not Float32"))?;
3879 Ok(array.value(row))
3880}
3881
3882pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3883 let array = batch
3884 .column_by_name(name)
3885 .with_context(|| format!("missing column {name}"))?
3886 .as_any()
3887 .downcast_ref::<TimestampMicrosecondArray>()
3888 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3889 Utc.timestamp_micros(array.value(row))
3890 .single()
3891 .context("timestamp is out of range")
3892}
3893
3894fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3895 Field::new(name, data_type, nullable).with_metadata(
3896 [(
3897 "lance-schema:unenforced-primary-key".to_owned(),
3898 "true".to_owned(),
3899 )]
3900 .into(),
3901 )
3902}
3903
3904fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3914 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3915 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3916 .into_iter()
3917 .collect(),
3918 )
3919}
3920
3921fn json_field(name: &str, nullable: bool) -> Field {
3922 lance_arrow::json::json_field(name, nullable)
3923}
3924
3925fn micros(timestamp: DateTime<Utc>) -> i64 {
3926 timestamp.timestamp_micros()
3927}
3928
3929fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3930 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3938 lance_arrow::json::encode_json(&text)
3939 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3940}
3941
3942fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3943 serde_json::from_slice(value).context("failed to parse JSON field")
3944}
3945
3946fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3947 if let PartKind::File {
3948 media_type,
3949 file_name,
3950 data,
3951 } = kind
3952 {
3953 let data_kind = match data {
3954 FileData::String(_) => "string",
3955 FileData::Bytes(_) => "bytes",
3956 FileData::Url(_) => "url",
3957 };
3958 return json_bytes(&serde_json::json!({
3959 "media_type": media_type,
3960 "file_name": file_name,
3961 "data_kind": data_kind,
3962 }));
3963 }
3964 let value = serde_json::to_value(kind)?;
3965 let mut object = value
3966 .as_object()
3967 .cloned()
3968 .context("part variant did not serialize to an object")?;
3969 object.remove("type");
3970 json_bytes(&object)
3971}
3972
3973fn part_kind_from_json(
3974 type_name: &str,
3975 variant_data: &[u8],
3976 file_data: Option<FileData>,
3977) -> Result<PartKind> {
3978 let mut value = json_parse::<Value>(variant_data)?;
3979 let object = value
3980 .as_object_mut()
3981 .context("part variant data is not an object")?;
3982 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3983 if let Some(data) = file_data {
3984 object.remove("data_kind");
3985 object.insert("data".to_owned(), serde_json::to_value(data)?);
3986 }
3987 serde_json::from_value(value).context("failed to parse part kind")
3988}
3989
3990#[cfg(test)]
3991mod tests {
3992 #![allow(clippy::expect_used, clippy::unwrap_used)]
3993
3994 use super::*;
3995 use crate::{
3996 adapter::Extracted,
3997 handlers::ingest_events,
3998 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3999 };
4000 use chrono::Utc;
4001 use serde_json::json;
4002 use tempfile::TempDir;
4003
4004 fn synthetic_session(id: &str) -> Session {
4005 Session {
4006 id: id.to_owned(),
4007 parent_session_id: None,
4008 parent_message_id: None,
4009 source_agent: "claude-code".to_owned(),
4010 created_at: Utc::now(),
4011 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4012 options: ProviderOptions::new(),
4013 }
4014 }
4015
4016 #[test]
4017 fn search_text_excludes_injected_parts() {
4018 use crate::wire::Provenance;
4019 let message = Message::User {
4020 id: "m1".to_owned(),
4021 session_id: "s1".to_owned(),
4022 timestamp: Utc::now(),
4023 options: ProviderOptions::new(),
4024 };
4025 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4026 session_id: "s1".to_owned(),
4027 id: id.to_owned(),
4028 message_id: "m1".to_owned(),
4029 ordinal: 0,
4030 provenance,
4031 options: ProviderOptions::new(),
4032 kind: PartKind::Text {
4033 text: Some(Extracted::from_test_value(text.to_owned())),
4034 },
4035 };
4036
4037 let conversational = search_text(
4040 &message,
4041 &[text_part(
4042 "p1",
4043 "real human prompt",
4044 Provenance::Conversational,
4045 )],
4046 );
4047 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4048
4049 let injected = search_text(
4050 &message,
4051 &[text_part(
4052 "p2",
4053 "<task-notification>...</task-notification>",
4054 Provenance::Injected,
4055 )],
4056 );
4057 assert!(
4058 injected.is_none(),
4059 "a message whose only part is injected has null search_text"
4060 );
4061 }
4062
4063 #[test]
4064 fn chunk_ranges_splits_on_byte_budget() {
4065 assert!(chunk_ranges(&[]).is_empty());
4066 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4067
4068 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4069 assert_eq!(
4070 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4071 vec![0..1, 1..2, 2..3],
4072 );
4073
4074 assert_eq!(
4076 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4077 vec![0..1, 1..2, 2..3],
4078 );
4079 }
4080
4081 #[tokio::test]
4082 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4083 let temp = TempDir::new()?;
4088 let store = Store::open_local(temp.path()).await?;
4089 let session = synthetic_session("ordering");
4090 let orphan_part = Part {
4091 session_id: session.id.clone(),
4092 id: "orphan-part".to_owned(),
4093 message_id: "missing-message".to_owned(),
4094 ordinal: 0,
4095 provenance: crate::wire::Provenance::Conversational,
4096 options: ProviderOptions::new(),
4097 kind: PartKind::Text {
4098 text: Some(Extracted::from_test_value("orphan".to_owned())),
4099 },
4100 };
4101 let valid_message = Message::User {
4102 id: "valid-message".to_owned(),
4103 session_id: session.id.clone(),
4104 timestamp: Utc::now(),
4105 options: ProviderOptions::new(),
4106 };
4107 let valid_part = Part {
4108 session_id: session.id.clone(),
4109 id: "valid-part".to_owned(),
4110 message_id: valid_message.id().to_owned(),
4111 ordinal: 0,
4112 provenance: crate::wire::Provenance::Conversational,
4113 options: ProviderOptions::new(),
4114 kind: PartKind::Text {
4115 text: Some(Extracted::from_test_value("kept".to_owned())),
4116 },
4117 };
4118
4119 let mut validator = IngestValidator::default();
4120 validator
4121 .push(&store, 0, IngestEvent::Session(session.clone()))
4122 .await?;
4123 let part_outcomes = validator
4124 .push(&store, 1, IngestEvent::Part(orphan_part))
4125 .await?;
4126 assert_eq!(part_outcomes.len(), 1);
4127 assert_eq!(part_outcomes[0].kind, "part");
4128 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4129 assert!(
4130 part_outcomes[0]
4131 .error
4132 .as_ref()
4133 .map(|e| e.message.contains("part event appeared before a message"))
4134 .unwrap_or(false),
4135 "error message must explain the ordering violation: {part_outcomes:?}"
4136 );
4137 validator
4138 .push(&store, 2, IngestEvent::Message(valid_message))
4139 .await?;
4140 validator
4141 .push(&store, 3, IngestEvent::Part(valid_part))
4142 .await?;
4143 validator.finish(&store).await?;
4144
4145 let (sessions, messages, parts) = store.row_counts().await?;
4146 assert_eq!(sessions, 1, "session committed despite the orphan part");
4147 assert_eq!(messages, 1, "valid message committed");
4148 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4149
4150 Ok(())
4151 }
4152
4153 #[tokio::test]
4154 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4155 let temp = TempDir::new()?;
4159 let store = Store::open_local(temp.path()).await?;
4160 let session = synthetic_session("duplicate-message");
4161 let first = Message::User {
4162 id: "message-1".to_owned(),
4163 session_id: session.id.clone(),
4164 timestamp: Utc::now(),
4165 options: ProviderOptions::new(),
4166 };
4167 let second = Message::Assistant {
4168 id: "message-1".to_owned(),
4169 session_id: session.id.clone(),
4170 timestamp: Utc::now(),
4171 options: ProviderOptions::new(),
4172 };
4173
4174 let mut validator = IngestValidator::default();
4175 validator
4176 .push(&store, 0, IngestEvent::Session(session.clone()))
4177 .await?;
4178 validator
4179 .push(&store, 1, IngestEvent::Message(first))
4180 .await?;
4181 let dup_outcomes = validator
4182 .push(&store, 2, IngestEvent::Message(second))
4183 .await?;
4184 assert_eq!(dup_outcomes.len(), 1);
4185 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4186 assert!(
4187 dup_outcomes[0]
4188 .error
4189 .as_ref()
4190 .map(|e| e.message.contains("duplicate message id message-1"))
4191 .unwrap_or(false),
4192 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4193 );
4194
4195 validator.finish(&store).await?;
4196 let (sessions, messages, _) = store.row_counts().await?;
4197 assert_eq!(sessions, 1, "session committed");
4198 assert_eq!(messages, 1, "only the first message committed");
4199
4200 Ok(())
4201 }
4202
4203 #[tokio::test]
4204 async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4205 -> anyhow::Result<()> {
4206 let temp = TempDir::new()?;
4210 let store = Store::open_local(temp.path()).await?;
4211 let session = synthetic_session("host-provenance");
4212 let mut spoofed = ProviderOptions::new();
4213 spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4214 let message = Message::User {
4215 id: "message-1".to_owned(),
4216 session_id: session.id.clone(),
4217 timestamp: Utc::now(),
4218 options: spoofed,
4219 };
4220 let part = Part {
4221 session_id: session.id.clone(),
4222 id: "part-1".to_owned(),
4223 message_id: "message-1".to_owned(),
4224 ordinal: 0,
4225 provenance: crate::wire::Provenance::Conversational,
4226 options: ProviderOptions::new(),
4227 kind: PartKind::Text {
4228 text: Some(Extracted::from_test_value("hello".to_owned())),
4229 },
4230 };
4231
4232 let mut validator = IngestValidator::default();
4233 validator
4234 .push(&store, 0, IngestEvent::Session(session.clone()))
4235 .await?;
4236 validator
4237 .push(&store, 1, IngestEvent::Message(message))
4238 .await?;
4239 validator.push(&store, 2, IngestEvent::Part(part)).await?;
4240 validator.finish(&store).await?;
4241
4242 let stored = store
4243 .get_session(&session.id)
4244 .await?
4245 .expect("ingested session is readable");
4246 assert!(
4247 !stored.session.options.contains_key("pond"),
4248 "session rows are not stamped (attribution derives from messages)"
4249 );
4250 let stored_message = &stored.messages[0].message;
4251 match ingest_host_stamp() {
4252 Some(stamp) => {
4253 assert_eq!(
4254 stored_message.options().get("pond"),
4255 Some(stamp),
4256 "stored message carries the real stamp, never the spoof"
4257 );
4258 let host = stamp
4259 .pointer("/ingest/host")
4260 .and_then(Value::as_object)
4261 .expect("stamp shape is {ingest: {host: {..}}}");
4262 assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4263 assert!(
4264 host.values()
4265 .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4266 "stamp fields are omitted when unavailable, never empty: {host:?}"
4267 );
4268 }
4269 None => assert!(
4270 stored_message.options().get("pond").is_none(),
4271 "with no resolvable stamp the spoofed key is still stripped"
4272 ),
4273 }
4274 assert!(
4275 !stored.messages[0].parts[0].options.contains_key("pond"),
4276 "part rows are not stamped (covered by their message's stamp)"
4277 );
4278
4279 Ok(())
4280 }
4281
4282 #[tokio::test(flavor = "multi_thread")]
4290 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4291 use crate::wire::{FileData, PartKind, Provenance};
4292 let temp = TempDir::new()?;
4293 let store = Store::open_local(temp.path()).await?;
4294
4295 let session = synthetic_session("compact-blob");
4296 store
4297 .upsert_sessions(std::slice::from_ref(&session))
4298 .await?;
4299
4300 let make_part = |idx: usize, kind: PartKind| Part {
4301 session_id: session.id.clone(),
4302 message_id: format!("msg-{idx}"),
4303 id: format!("part-{idx}"),
4304 ordinal: 0,
4305 provenance: Provenance::Conversational,
4306 options: ProviderOptions::new(),
4307 kind,
4308 };
4309
4310 let batch_a = vec![
4311 make_part(
4312 0,
4313 PartKind::File {
4314 media_type: Some("text/plain".to_owned()),
4315 file_name: Some("a.txt".to_owned()),
4316 data: FileData::Bytes(b"alpha".to_vec()),
4317 },
4318 ),
4319 make_part(
4320 1,
4321 PartKind::File {
4322 media_type: Some("text/plain".to_owned()),
4323 file_name: Some("b.txt".to_owned()),
4324 data: FileData::String("beta".to_owned()),
4325 },
4326 ),
4327 ];
4328 store.upsert_parts(&batch_a).await?;
4329
4330 let batch_b = vec![
4331 make_part(
4332 2,
4333 PartKind::File {
4334 media_type: Some("application/octet-stream".to_owned()),
4335 file_name: None,
4336 data: FileData::Url("https://example.com/file".to_owned()),
4337 },
4338 ),
4339 make_part(
4340 3,
4341 PartKind::File {
4342 media_type: Some("image/png".to_owned()),
4343 file_name: Some("c.png".to_owned()),
4344 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4345 },
4346 ),
4347 ];
4348 store.upsert_parts(&batch_b).await?;
4349
4350 store
4351 .optimize_indices(None, &MaintenancePolicy::always_compact())
4352 .await?
4353 .into_result()?;
4354
4355 Ok(())
4356 }
4357
4358 #[tokio::test]
4359 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4360 let temp = TempDir::new()?;
4361 let store = Store::open_local(temp.path()).await?;
4362 let session = synthetic_session("blob");
4363 let message = Message::User {
4364 id: "message-1".to_owned(),
4365 session_id: session.id.clone(),
4366 timestamp: Utc::now(),
4367 options: ProviderOptions::new(),
4368 };
4369 let part = Part {
4370 session_id: session.id.clone(),
4371 id: "part-1".to_owned(),
4372 message_id: message.id().to_owned(),
4373 ordinal: 0,
4374 provenance: crate::wire::Provenance::Conversational,
4375 options: ProviderOptions::new(),
4376 kind: PartKind::File {
4377 media_type: Some("text/plain".to_owned()),
4378 file_name: Some("payload.txt".to_owned()),
4379 data: FileData::Bytes(b"pond".to_vec()),
4380 },
4381 };
4382
4383 let mut validator = IngestValidator::default();
4384 validator
4385 .push(&store, 0, IngestEvent::Session(session.clone()))
4386 .await?;
4387 validator
4388 .push(&store, 1, IngestEvent::Message(message.clone()))
4389 .await?;
4390 validator
4391 .push(&store, 2, IngestEvent::Part(part.clone()))
4392 .await?;
4393 validator.finish(&store).await?;
4394
4395 let stored = store
4396 .get_session(&session.id)
4397 .await?
4398 .expect("session should exist");
4399 let stored_part = &stored.messages[0].parts[0];
4400 assert_eq!(stored_part, &part);
4401
4402 Ok(())
4403 }
4404
4405 fn base_session() -> Session {
4416 Session {
4417 id: "01HXY00000000001".to_owned(),
4418 parent_session_id: None,
4419 parent_message_id: None,
4420 source_agent: "claude-code".to_owned(),
4421 created_at: Utc::now(),
4422 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4423 options: ProviderOptions::new(),
4424 }
4425 }
4426
4427 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4428 outcomes
4429 .iter()
4430 .filter(|outcome| outcome.status == target)
4431 .count()
4432 }
4433
4434 #[tokio::test(flavor = "multi_thread")]
4435 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4436 -> anyhow::Result<()> {
4437 let temp = TempDir::new()?;
4438 let store = Store::open_local(temp.path()).await?;
4439
4440 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4441 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4442
4443 let mut again = base_session();
4444 again.options.insert("title".to_owned(), json!("renamed"));
4445 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4446 assert_eq!(
4447 count_status(&second, OutcomeStatus::Error),
4448 0,
4449 "options is mutable; the re-ingest must not surface an error: {second:?}",
4450 );
4451 assert_eq!(
4452 count_status(&second, OutcomeStatus::Matched),
4453 1,
4454 "unchanged immutable fields must match-insert via merge_insert",
4455 );
4456
4457 Ok(())
4458 }
4459
4460 #[tokio::test(flavor = "multi_thread")]
4461 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4462 let temp = TempDir::new()?;
4463 let store = Store::open_local(temp.path()).await?;
4464
4465 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4466 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4467
4468 let mut tampered = base_session();
4469 tampered.source_agent = "codex-cli".to_owned();
4470 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4471 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4472 let err_row = second
4473 .iter()
4474 .find(|outcome| outcome.status == OutcomeStatus::Error)
4475 .expect("error outcome present");
4476 let err = err_row.error.as_ref().expect("error body present");
4477 assert_eq!(err.field, Some("source_agent"));
4478 assert_eq!(err.reason, Some("immutable"));
4479
4480 let stored = store
4482 .get_session(&base_session().id)
4483 .await?
4484 .expect("session row survives the rejected re-ingest");
4485 assert_eq!(stored.session.source_agent, "claude-code");
4486
4487 Ok(())
4488 }
4489
4490 #[tokio::test(flavor = "multi_thread")]
4491 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4492 let temp = TempDir::new()?;
4493 let store = Store::open_local(temp.path()).await?;
4494
4495 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4496 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4497
4498 let mut tampered = base_session();
4499 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4500 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4501 let err_row = second
4502 .iter()
4503 .find(|outcome| outcome.status == OutcomeStatus::Error)
4504 .expect("project change must surface an error outcome");
4505 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4506
4507 let stored = store
4508 .get_session(&base_session().id)
4509 .await?
4510 .expect("session row survives");
4511 assert_eq!(
4512 stored.session.project.as_str(),
4513 "/home/me/proj",
4514 "stored project must remain the original",
4515 );
4516
4517 Ok(())
4518 }
4519
4520 #[tokio::test(flavor = "multi_thread")]
4521 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4522 use crate::wire::Provenance;
4530 let temp = TempDir::new()?;
4531 let store = Store::open_local(temp.path()).await?;
4532 let session = base_session();
4533
4534 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4535 session_id: session.id.clone(),
4536 id: part_id.to_owned(),
4537 message_id: message_id.to_owned(),
4538 ordinal: 0,
4539 provenance: Provenance::Conversational,
4540 options: ProviderOptions::new(),
4541 kind: PartKind::Text {
4542 text: Some(Extracted::from_test_value(body.to_owned())),
4543 },
4544 };
4545 let user_message = |id: &str| Message::User {
4546 id: id.to_owned(),
4547 session_id: session.id.clone(),
4548 timestamp: Utc::now(),
4549 options: ProviderOptions::new(),
4550 };
4551
4552 let mut validator = IngestValidator::default();
4554 validator
4555 .push(&store, 0, IngestEvent::Session(session.clone()))
4556 .await?;
4557 validator
4558 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4559 .await?;
4560 validator
4561 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4562 .await?;
4563 validator
4564 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4565 .await?;
4566 validator
4567 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4568 .await?;
4569 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4570 assert_eq!(first_counts.sessions_inserted, 1);
4571 assert_eq!(first_counts.messages_inserted_total, 2);
4572 assert_eq!(first_counts.messages_inserted_searchable, 2);
4573
4574 let mut validator = IngestValidator::default();
4576 validator
4577 .push(&store, 0, IngestEvent::Session(session.clone()))
4578 .await?;
4579 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4580 let pid = format!("p{}", idx + 3);
4581 validator
4582 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4583 .await?;
4584 validator
4585 .push(
4586 &store,
4587 idx * 2 + 2,
4588 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4589 )
4590 .await?;
4591 }
4592 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4593
4594 assert_eq!(
4595 second_counts.sessions_inserted, 0,
4596 "existing session row must report as Matched, not Inserted",
4597 );
4598 assert_eq!(second_counts.sessions_matched, 1);
4599 assert_eq!(
4600 second_counts.messages_inserted_total, 3,
4601 "the three NEW messages must register as Inserted in BatchCounts",
4602 );
4603 assert_eq!(
4604 second_counts.messages_inserted_searchable, 3,
4605 "all three new messages carry conversational text -> searchable",
4606 );
4607 assert_eq!(second_counts.messages_matched_total, 0);
4608 assert_eq!(second_counts.parts_inserted, 3);
4609 assert_eq!(second_counts.parts_matched, 0);
4610
4611 let session_outcome = second_outcomes
4614 .iter()
4615 .find(|outcome| outcome.kind == "session")
4616 .expect("session-row outcome present");
4617 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4618 for outcome in &second_outcomes {
4619 if outcome.kind == "message" || outcome.kind == "part" {
4620 assert_eq!(
4621 outcome.status,
4622 OutcomeStatus::Inserted,
4623 "new row must be Inserted, got: {outcome:?}",
4624 );
4625 }
4626 }
4627 Ok(())
4628 }
4629
4630 async fn store_with_messages(
4634 temp: &TempDir,
4635 count: usize,
4636 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4637 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4638 }
4639
4640 async fn store_with_messages_at_threshold(
4643 temp: &TempDir,
4644 count: usize,
4645 _vector_threshold: usize,
4646 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4647 let store = Store::open_local(temp.path()).await?;
4648 let sessions = 8.min(count.max(1));
4649 let mut events = Vec::new();
4650 for s in 0..sessions {
4651 events.push(IngestEvent::Session(Session {
4652 id: format!("session-{s}"),
4653 parent_session_id: None,
4654 parent_message_id: None,
4655 source_agent: "claude-code".to_owned(),
4656 created_at: Utc::now(),
4657 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4658 options: ProviderOptions::new(),
4659 }));
4660 for i in (s..count).step_by(sessions) {
4661 let message_id = format!("msg-{i}");
4662 events.push(IngestEvent::Message(Message::User {
4663 id: message_id.clone(),
4664 session_id: format!("session-{s}"),
4665 timestamp: Utc::now(),
4666 options: ProviderOptions::new(),
4667 }));
4668 events.push(IngestEvent::Part(Part {
4669 session_id: format!("session-{s}"),
4670 id: format!("{message_id}-part"),
4671 message_id,
4672 ordinal: 0,
4673 provenance: crate::wire::Provenance::Conversational,
4674 options: ProviderOptions::new(),
4675 kind: PartKind::Text {
4676 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4677 },
4678 }));
4679 }
4680 }
4681 ingest_events(&store, events).await?;
4682 let keys = (0..count)
4683 .map(|i| MessageKey {
4684 session_id: format!("session-{}", i % sessions),
4685 message_id: format!("msg-{i}"),
4686 })
4687 .collect();
4688 Ok((store, keys))
4689 }
4690
4691 fn synthetic_vector(seed: usize) -> Vec<f32> {
4693 let mut state = (seed as u64)
4694 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4695 .wrapping_add(1);
4696 (0..embedding_dim())
4697 .map(|_| {
4698 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4699 #[allow(clippy::cast_precision_loss)]
4700 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4701 unit - 1.0
4702 })
4703 .collect()
4704 }
4705
4706 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4708 keys.iter()
4709 .enumerate()
4710 .map(|(seed, key)| EmbeddedMessage {
4711 session_id: key.session_id.clone(),
4712 id: key.message_id.clone(),
4713 vector: synthetic_vector(seed),
4714 })
4715 .collect()
4716 }
4717
4718 fn embedding_update_batch_with_model(
4719 rows: &[EmbeddedMessage],
4720 model: &str,
4721 ) -> Result<RecordBatch> {
4722 let mut batch = embedding_update_batch(rows)?;
4723 let columns = batch
4724 .columns()
4725 .iter()
4726 .take(3)
4727 .cloned()
4728 .chain(std::iter::once(
4729 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4730 ))
4731 .collect::<Vec<_>>();
4732 batch = RecordBatch::try_new(batch.schema(), columns)?;
4733 Ok(batch)
4734 }
4735
4736 #[tokio::test]
4737 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4738 let temp = TempDir::new()?;
4739 let (store, keys) = store_with_messages(&temp, 4).await?;
4743 store.write_embeddings(&embedded(&keys)).await?;
4744 store
4745 .optimize_indices(None, &MaintenancePolicy::always_compact())
4746 .await?
4747 .into_result()?;
4748
4749 let query = vec![0.01_f32; embedding_dim()];
4750 let plan = store
4751 .explain_vector_plan(
4752 &query,
4753 10,
4754 &Predicate::Eq("session_id", "session-3".into()),
4755 None,
4756 )
4757 .await?;
4758
4759 assert!(
4764 plan.contains("ScalarIndexQuery"),
4765 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4766 );
4767 let predicate_postfiltered = plan
4768 .lines()
4769 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4770 assert!(
4771 !predicate_postfiltered,
4772 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4773 );
4774 Ok(())
4775 }
4776
4777 #[tokio::test]
4778 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4779 let temp = TempDir::new()?;
4780 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4781
4782 store.write_embeddings(&embedded(&keys[..255])).await?;
4785 store
4786 .optimize_indices_with_vector_threshold(256)
4787 .await?
4788 .into_result()?;
4789 assert!(
4790 !store
4791 .handle
4792 .messages_index_names()
4793 .await?
4794 .iter()
4795 .any(|name| name == MESSAGES_VECTOR_INDEX),
4796 "IVF_PQ must not exist below the activation threshold",
4797 );
4798
4799 store.write_embeddings(&embedded(&keys[255..256])).await?;
4802 store
4803 .optimize_indices_with_vector_threshold(256)
4804 .await?
4805 .into_result()?;
4806 assert!(
4807 store
4808 .handle
4809 .messages_index_names()
4810 .await?
4811 .iter()
4812 .any(|name| name == MESSAGES_VECTOR_INDEX),
4813 "optimize must create the IVF_PQ once the threshold is crossed",
4814 );
4815
4816 let hits = store
4819 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4820 .await?;
4821 assert!(
4822 hits.iter().any(|(key, _)| key == &keys[0]),
4823 "an embedded row is retrievable via the index",
4824 );
4825 Ok(())
4826 }
4827
4828 #[tokio::test]
4829 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4830 {
4831 let temp = TempDir::new()?;
4832 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4833 let old_rows = embedded(&keys);
4834 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4835 store
4836 .handle
4837 .merge_update(Table::Messages, old_batch, old_rows.len())
4838 .await?;
4839 store
4840 .optimize_indices_with_vector_threshold(256)
4841 .await?
4842 .into_result()?;
4843 assert!(
4844 store
4845 .handle
4846 .messages_index_names()
4847 .await?
4848 .iter()
4849 .any(|name| name == MESSAGES_VECTOR_INDEX),
4850 "IVF_PQ must exist before a model swap",
4851 );
4852 assert_eq!(store.stale_embedding_count().await?, keys.len());
4853
4854 store.drop_vector_index().await?;
4855 let mut pending = Vec::new();
4856 let stream = store.pending_or_stale_messages();
4857 tokio::pin!(stream);
4858 while let Some(row) = stream.next().await {
4859 pending.push(row?);
4860 }
4861 assert_eq!(
4862 pending.len(),
4863 keys.len(),
4864 "force stream should see stale rows"
4865 );
4866 store.write_embeddings(&embedded(&keys)).await?;
4867 assert_eq!(store.stale_embedding_count().await?, 0);
4868 store
4869 .optimize_indices_with_vector_threshold(256)
4870 .await?
4871 .into_result()?;
4872 assert!(
4873 store
4874 .handle
4875 .messages_index_names()
4876 .await?
4877 .iter()
4878 .any(|name| name == MESSAGES_VECTOR_INDEX),
4879 "optimize must rebuild IVF_PQ after force re-embed",
4880 );
4881
4882 let stream = store.pending_or_stale_messages();
4883 tokio::pin!(stream);
4884 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4885 Ok(())
4886 }
4887
4888 #[tokio::test]
4889 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4890 let temp = TempDir::new()?;
4899 let (store, _keys) = store_with_messages(&temp, 4).await?;
4900
4901 for tag in 0..3 {
4904 let extra = synthetic_session(&format!("extra-{tag}"));
4905 store.upsert_sessions(&[extra]).await?;
4906 }
4907
4908 let dataset = store.handle.dataset(Table::Sessions).await?;
4913 dataset
4914 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4915 .await
4916 .context("cleanup_old_versions failed")?;
4917
4918 let map = store.session_last_ingested_at().await?;
4919 let session_count = store.row_counts().await?.0;
4920 assert!(
4921 map.len() >= session_count,
4922 "watermark map ({}) must still cover every session ({}) after \
4923 version cleanup; an empty fallback regresses pond sync to a \
4924 full re-scan",
4925 map.len(),
4926 session_count,
4927 );
4928 Ok(())
4929 }
4930
4931 #[tokio::test]
4932 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4933 let temp = TempDir::new()?;
4934 let (store, keys) = store_with_messages(&temp, 10).await?;
4935
4936 let before = store.embedding_progress().await?;
4937 assert_eq!(before.embedded, 0);
4938 assert_eq!(before.total, 10);
4939 assert_eq!(before.model, crate::embed::model_id());
4940
4941 store.write_embeddings(&embedded(&keys[..4])).await?;
4942 let partial = store.embedding_progress().await?;
4943 assert_eq!(partial.embedded, 4);
4944 assert_eq!(partial.total, 10);
4945
4946 store.write_embeddings(&embedded(&keys[4..])).await?;
4947 let full = store.embedding_progress().await?;
4948 assert_eq!(full.embedded, 10);
4949 assert_eq!(full.total, 10);
4950 Ok(())
4951 }
4952}