1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, OptimizeProgressFn,
28 PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table, TableOptimizeOutcome, TableSizes,
29 VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session},
32};
33use url::Url;
34
35#[derive(Debug)]
36pub struct Store {
37 handle: Handle,
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
41pub struct LanceArchiveCounts {
42 pub sessions: usize,
43 pub messages: usize,
44 pub parts: usize,
45}
46
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
48pub struct LanceArchiveVersions {
49 pub sessions: u64,
50 pub messages: u64,
51 pub parts: u64,
52}
53
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
55pub struct LanceArchiveExport {
56 pub rows: LanceArchiveCounts,
57 pub source_versions: LanceArchiveVersions,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveImport {
62 pub rows: LanceArchiveCounts,
63 pub inserted: LanceArchiveCounts,
64}
65
66#[derive(Debug, Clone, Default)]
67pub struct IndexIntents {
68 pub sessions: Vec<IndexIntent>,
69 pub messages: Vec<IndexIntent>,
70 pub parts: Vec<IndexIntent>,
71}
72
73impl IndexIntents {
74 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
75 [
76 (Table::Sessions, &self.sessions),
77 (Table::Messages, &self.messages),
78 (Table::Parts, &self.parts),
79 ]
80 }
81}
82
83#[derive(Debug, Clone, PartialEq)]
87pub struct PendingMessage {
88 pub session_id: String,
89 pub id: String,
90 pub search_text: String,
91}
92
93#[derive(Debug, Clone, PartialEq)]
96pub struct EmbeddedMessage {
97 pub session_id: String,
98 pub id: String,
99 pub vector: Vec<f32>,
100}
101
102#[derive(Debug, Clone, PartialEq)]
104pub struct MessageMeta {
105 pub message_id: String,
106 pub session_id: String,
107 pub role: String,
108 pub project: String,
109 pub source_agent: String,
110 pub timestamp: DateTime<Utc>,
111 pub search_text: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
115pub struct MessageKey {
116 pub session_id: String,
117 pub message_id: String,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum UpsertStatus {
122 Inserted,
123 Matched,
124}
125
126#[derive(Debug, Default)]
131pub struct OptimizeOutcome {
132 pub tables: Vec<TableOptimizeOutcome>,
133}
134
135impl OptimizeOutcome {
136 pub fn any_indices_failed(&self) -> bool {
139 self.tables.iter().any(|t| t.indices.is_failed())
140 }
141
142 pub fn into_result(self) -> Result<Self> {
146 for table in &self.tables {
147 if let PhaseOutcome::Failed(error) = &table.indices {
148 anyhow::bail!(
149 "indices phase failed on {}: {error:#}",
150 table.table.as_str()
151 );
152 }
153 if let PhaseOutcome::Failed(error) = &table.compaction {
154 anyhow::bail!(
155 "compaction phase failed on {}: {error:#}",
156 table.table.as_str()
157 );
158 }
159 }
160 Ok(self)
161 }
162}
163
164pub fn default_cleanup_older_than() -> chrono::Duration {
171 chrono::Duration::days(1)
172}
173
174#[derive(Debug, Clone, Copy)]
179pub struct CleanupConfig {
180 pub older_than: chrono::Duration,
181 pub delete_unverified: bool,
182}
183
184impl Default for CleanupConfig {
185 fn default() -> Self {
186 Self {
187 older_than: default_cleanup_older_than(),
188 delete_unverified: false,
189 }
190 }
191}
192
193#[derive(Debug, Clone)]
196pub struct CorpusStats {
197 pub data_url: Url,
198 pub totals: RowTotals,
199 pub adapters: Vec<AdapterStats>,
207 pub include_subagents: bool,
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct RowTotals {
215 pub sessions: u64,
216 pub messages: u64,
217 pub parts: u64,
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub struct EmbeddingProgress {
227 pub embedded: usize,
228 pub total: usize,
229 pub model: &'static str,
230}
231
232#[derive(Debug, Clone)]
233pub struct AdapterStats {
234 pub adapter: String,
238 pub sessions: u64,
239 pub messages: u64,
240 pub projects: Vec<ProjectStats>,
243}
244
245#[derive(Debug, Clone)]
246pub struct ProjectStats {
247 pub project: String,
248 pub sessions: u64,
249 pub messages: u64,
250}
251
252#[derive(Default)]
253struct GroupAccumulator {
254 messages: u64,
255 session_ids: HashSet<String>,
256}
257
258#[derive(Debug, Clone, Copy)]
259pub struct MessageWrite<'a> {
260 pub message: &'a Message,
261 pub parts: &'a [Part],
262 pub search_text: Option<&'a str>,
263}
264
265impl Store {
266 pub async fn open(location: &Url) -> Result<Self> {
272 Ok(Self {
273 handle: Handle::open(location).await?,
274 })
275 }
276
277 pub async fn open_with_options(
283 location: &Url,
284 storage_options: std::collections::HashMap<String, String>,
285 caps: crate::substrate::RuntimeCaps,
286 ) -> Result<Self> {
287 Ok(Self {
288 handle: Handle::open_with_options(location, storage_options, caps).await?,
289 })
290 }
291
292 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
297 let url = config::url_for_path(path)?;
298 Self::open_with_options(
299 &url,
300 std::collections::HashMap::new(),
301 crate::substrate::RuntimeCaps::default(),
302 )
303 .await
304 }
305
306 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
314 std::fs::create_dir_all(dest)
315 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
316 let (sessions, sessions_version) = self
317 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
318 .await?;
319 let (messages, messages_version) = self
320 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
321 .await?;
322 let (parts, parts_version) = self
323 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
324 .await?;
325 Ok(LanceArchiveExport {
326 rows: LanceArchiveCounts {
327 sessions,
328 messages,
329 parts,
330 },
331 source_versions: LanceArchiveVersions {
332 sessions: sessions_version,
333 messages: messages_version,
334 parts: parts_version,
335 },
336 })
337 }
338
339 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
340 let sessions_dataset =
341 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
342 let messages_dataset =
343 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
344 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
345 let (sessions, sessions_inserted) = self
346 .import_clean_table(Table::Sessions, sessions_dataset)
347 .await?;
348 let (messages, messages_inserted) = self
349 .import_clean_table(Table::Messages, messages_dataset)
350 .await?;
351 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
352 Ok(LanceArchiveImport {
353 rows: LanceArchiveCounts {
354 sessions,
355 messages,
356 parts,
357 },
358 inserted: LanceArchiveCounts {
359 sessions: sessions_inserted,
360 messages: messages_inserted,
361 parts: parts_inserted,
362 },
363 })
364 }
365
366 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
367 let dataset = self.handle.dataset(table).await?;
368 let source_version = dataset.version_id();
369 let schema = export_schema(table);
370 let mut stream = dataset
371 .scan()
372 .try_into_stream()
373 .await
374 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
375 let dest_uri = dest
376 .to_str()
377 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
378
379 let mut rows = 0usize;
380 let mut wrote = false;
381 while let Some(batch) = stream.next().await {
382 let batch = batch
383 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
384 rows += batch.num_rows();
385 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
386 let mut params = write_params_for_create();
387 if wrote {
388 params.mode = WriteMode::Append;
389 }
390 Dataset::write(reader, dest_uri, Some(params))
391 .await
392 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
393 wrote = true;
394 }
395
396 if !wrote {
397 let batch = RecordBatch::new_empty(schema.clone());
398 let reader = RecordBatchIterator::new([Ok(batch)], schema);
399 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
400 .await
401 .with_context(|| {
402 format!("failed to write empty {} archive table", table.as_str())
403 })?;
404 }
405 Ok((rows, source_version))
406 }
407
408 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
409 let mut stream = dataset
410 .scan()
411 .try_into_stream()
412 .await
413 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
414 let mut rows = 0usize;
415 let mut inserted = 0usize;
416 while let Some(batch) = stream.next().await {
417 let batch = batch
418 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
419 let row_count = batch.num_rows();
420 rows += row_count;
421 inserted += self
422 .handle
423 .merge_insert(table, batch, row_count)
424 .await
425 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
426 as usize;
427 }
428 Ok((rows, inserted))
429 }
430
431 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
432 if sessions.is_empty() {
433 return Ok(Vec::new());
434 }
435 let batches = sessions_batches(sessions)?;
436 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
437 Ok(statuses_from_inserted(sessions.len(), inserted))
439 }
440
441 async fn upsert_session_batch(
465 &self,
466 substreams: Vec<CompletedSubstream>,
467 ) -> Result<Vec<RowOutcome>> {
468 if substreams.is_empty() {
469 return Ok(Vec::new());
470 }
471
472 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
473
474 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
478 let mut by_session_id: std::collections::HashMap<String, usize> =
479 std::collections::HashMap::with_capacity(substreams.len());
480 for substream in substreams {
481 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
482 let existing = &merged[existing_idx];
483 if existing.session.source_agent != substream.session.source_agent
484 || existing.session.project != substream.session.project
485 {
486 let reason = if existing.session.source_agent != substream.session.source_agent
491 {
492 IngestError::ImmutableField {
493 field: "source_agent",
494 session_id: substream.session.id.clone(),
495 stored: existing.session.source_agent.clone(),
496 attempted: substream.session.source_agent.clone(),
497 }
498 } else {
499 IngestError::ImmutableField {
500 field: "project",
501 session_id: substream.session.id.clone(),
502 stored: (*existing.session.project).clone(),
503 attempted: (*substream.session.project).clone(),
504 }
505 };
506 let field = match &reason {
507 IngestError::ImmutableField { field, .. } => Some(*field),
508 };
509 let reason_key = match field {
510 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
511 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
512 _ => DROP_REASON_UNCATEGORIZED,
513 };
514 outcomes.extend(error_outcomes_for_substream(
515 substream.session_index,
516 &substream.session,
517 &substream.messages,
518 reason.to_string(),
519 field,
520 reason_key,
521 ));
522 continue;
523 }
524 let existing = &mut merged[existing_idx];
529 let mut seen: std::collections::HashSet<String> = existing
530 .messages
531 .iter()
532 .map(|m| m.message.id().to_owned())
533 .collect();
534 for msg in substream.messages {
535 if seen.insert(msg.message.id().to_owned()) {
536 existing.messages.push(msg);
537 }
538 }
539 continue;
540 }
541 by_session_id.insert(substream.session.id.clone(), merged.len());
542 merged.push(substream);
543 }
544
545 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
546 for substream in merged {
547 if let Some(existing) = self.find_session(&substream.session.id).await?
548 && let Err(failure) = ensure_immutable_match(&existing, &substream.session)
549 {
550 let field = match &failure {
551 IngestError::ImmutableField { field, .. } => Some(*field),
552 };
553 let reason_key = match field {
554 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
555 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
556 _ => DROP_REASON_UNCATEGORIZED,
557 };
558 outcomes.extend(error_outcomes_for_substream(
559 substream.session_index,
560 &substream.session,
561 &substream.messages,
562 failure.to_string(),
563 field,
564 reason_key,
565 ));
566 continue;
567 }
568 writeable.push(substream);
569 }
570
571 if writeable.is_empty() {
572 outcomes.sort_by_key(|outcome| outcome.index);
573 return Ok(outcomes);
574 }
575
576 let sessions_owned: Vec<Session> = writeable
577 .iter()
578 .map(|substream| substream.session.clone())
579 .collect();
580 let message_rows: Vec<MessageBatchRow<'_>> = writeable
581 .iter()
582 .flat_map(|substream| {
583 substream.messages.iter().map(|buffered| MessageBatchRow {
584 message: &buffered.message,
585 source_agent: &substream.session.source_agent,
586 project: &substream.session.project,
587 search_text: buffered.search_text.as_deref(),
588 })
589 })
590 .collect();
591 let part_rows: Vec<Part> = writeable
592 .iter()
593 .flat_map(|substream| {
594 substream.messages.iter().flat_map(|buffered| {
595 buffered
596 .parts
597 .iter()
598 .map(|buffered_part| buffered_part.part.clone())
599 })
600 })
601 .collect();
602
603 let session_batches = sessions_batches(&sessions_owned)?;
604 let message_batches = messages_batches(&message_rows)?;
605 let part_batches = parts_batches(&part_rows)?;
606
607 let sessions_count = sessions_owned.len();
608
609 let (sessions_inserted, messages_inserted, parts_inserted) = tokio::try_join!(
610 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
611 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
612 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
613 )?;
614 let sessions_status = if sessions_inserted == sessions_count as u64 {
622 UpsertStatus::Inserted
623 } else if sessions_inserted == 0 {
624 UpsertStatus::Matched
625 } else {
626 UpsertStatus::Inserted
630 };
631 let _ = messages_inserted; let _ = parts_inserted;
633
634 for substream in &writeable {
635 outcomes.extend(success_outcomes_for_substream(
636 substream.session_index,
637 &substream.session,
638 &substream.messages,
639 sessions_status,
640 ));
641 }
642
643 outcomes.sort_by_key(|outcome| outcome.index);
644 Ok(outcomes)
645 }
646
647 pub async fn upsert_messages(
648 &self,
649 session: &Session,
650 messages: &[MessageWrite<'_>],
651 ) -> Result<Vec<UpsertStatus>> {
652 if messages.is_empty() {
653 return Ok(Vec::new());
654 }
655
656 let rows = messages
657 .iter()
658 .map(|write| MessageBatchRow {
659 message: write.message,
660 source_agent: &session.source_agent,
661 project: &session.project,
662 search_text: write.search_text,
663 })
664 .collect::<Vec<_>>();
665 let batches = messages_batches(&rows)?;
666 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
667 Ok(statuses_from_inserted(messages.len(), inserted))
668 }
669
670 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
671 if parts.is_empty() {
672 return Ok(Vec::new());
673 }
674 let batches = parts_batches(parts)?;
675 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
676 Ok(statuses_from_inserted(parts.len(), inserted))
677 }
678
679 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
680 let Some(session) = self.find_session(session_id).await? else {
681 return Ok(None);
682 };
683 let messages = self.messages_for_session(session_id).await?;
684 Ok(Some(SessionWithMessages { session, messages }))
685 }
686
687 pub async fn session_ids(&self) -> Result<Vec<String>> {
689 let batch = self
690 .handle
691 .scan_batch(Table::Sessions, None, &["id"])
692 .await?;
693 let mut ids = Vec::with_capacity(batch.num_rows());
694 for row in 0..batch.num_rows() {
695 if let Some(id) = string(&batch, "id", row)? {
696 ids.push(id);
697 }
698 }
699 Ok(ids)
700 }
701
702 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
703 let batch = self
704 .handle
705 .scan_batch(
706 Table::Sessions,
707 Some(&Predicate::Eq(
708 "parent_session_id",
709 parent_session_id.into(),
710 )),
711 &[
712 "id",
713 "parent_session_id",
714 "parent_message_id",
715 "source_agent",
716 "created_at",
717 "project",
718 "options",
719 ],
720 )
721 .await?;
722 let mut sessions = Vec::with_capacity(batch.num_rows());
723 for row in 0..batch.num_rows() {
724 sessions.push(session_from_batch(&batch, row)?);
725 }
726 sessions.sort_by(|left, right| left.id.cmp(&right.id));
727 Ok(sessions)
728 }
729
730 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
736 use lance::deps::arrow_array::UInt64Array;
737
738 let dataset = self.handle.dataset(Table::Sessions).await?;
739 let version_list = dataset.versions().await?;
740 let versions: HashMap<u64, DateTime<Utc>> = version_list
741 .iter()
742 .map(|v| (v.version, v.timestamp))
743 .collect();
744 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
752
753 let scanner = self
754 .handle
755 .scan(
756 Table::Sessions,
757 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
758 )
759 .await?;
760 let mut stream = scanner.try_into_stream().await?;
761 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
762 while let Some(batch) = stream.next().await {
763 let batch = batch?;
764 let version_array = batch
765 .column_by_name("_row_last_updated_at_version")
766 .context("missing _row_last_updated_at_version column")?
767 .as_any()
768 .downcast_ref::<UInt64Array>()
769 .context("_row_last_updated_at_version is not UInt64")?;
770 for row in 0..batch.num_rows() {
771 let Some(id) = string(&batch, "id", row)? else {
772 continue;
773 };
774 if version_array.is_null(row) {
775 continue;
776 }
777 let version = version_array.value(row);
778 let ts = versions.get(&version).copied().or(oldest_visible_ts);
779 if let Some(ts) = ts {
780 out.insert(id, ts);
781 }
782 }
783 }
784 Ok(out)
785 }
786
787 pub async fn session_view(
794 &self,
795 session_id: &str,
796 params: SessionViewParams<'_>,
797 ) -> Result<GetLookup<SessionPage>> {
798 let Some(session) = self.find_session(session_id).await? else {
799 return Ok(GetLookup::NotFound);
800 };
801
802 let mut rows = match params.mode {
803 ResponseMode::Conversational => self
804 .scan_conversational_messages(session_id)
805 .await?
806 .into_iter()
807 .map(|row| ScanRow {
808 id: row.message_id,
809 role: row.role,
810 timestamp: row.timestamp,
811 text: Some(row.text.into_inner()),
812 content: None,
813 })
814 .collect(),
815 ResponseMode::Complete | ResponseMode::Verbatim => {
816 self.scan_all_messages(session_id).await?
817 }
818 };
819 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
820
821 let start_at = match params.after_id {
822 Some(after) => match rows.iter().position(|row| row.id == after) {
825 Some(idx) => idx + 1,
826 None => return Ok(GetLookup::UnknownAfterId),
827 },
828 None => 0,
829 };
830 let remaining = rows.get(start_at..).unwrap_or(&[]);
831 let emitted_count = page_by(remaining, params.limit, params.budget_bytes, |row| {
832 row.text.as_deref().map_or(0, str::len)
833 });
834 let emitted = &remaining[..emitted_count];
835 let messages_remaining = remaining.len() - emitted_count;
836 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
837
838 let mut parts_by_message = match params.mode {
841 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
842 ResponseMode::Conversational | ResponseMode::Complete => {
843 self.summary_parts_for_messages(session_id, &ids).await?
844 }
845 };
846 let messages = emitted
847 .iter()
848 .map(|row| RetrievedMessage {
849 id: row.id.clone(),
850 role: row.role,
851 timestamp: row.timestamp,
852 text: row.text.clone(),
853 content: row.content.clone(),
854 parts: parts_by_message
855 .remove(&(session_id.to_owned(), row.id.clone()))
856 .unwrap_or_default(),
857 })
858 .collect();
859
860 Ok(GetLookup::Found(SessionPage {
861 session,
862 messages,
863 messages_remaining,
864 }))
865 }
866
867 pub async fn message_view(
873 &self,
874 message_id: &str,
875 params: MessageViewParams<'_>,
876 ) -> Result<GetLookup<MessagePage>> {
877 let Some(session_id) = self.session_id_for_message(message_id).await? else {
878 return Ok(GetLookup::NotFound);
879 };
880 let Some(session) = self.find_session(&session_id).await? else {
881 return Ok(GetLookup::NotFound);
882 };
883 let mut rows = self.scan_all_messages(&session_id).await?;
884 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
885 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
886 return Ok(GetLookup::NotFound);
887 };
888
889 let start = target_pos.saturating_sub(params.context_depth);
890 let end = (target_pos + params.context_depth + 1).min(rows.len());
891 let window = &rows[start..end];
892 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
893 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
896
897 let all_parts = parts_by_message
898 .remove(&(session_id.clone(), message_id.to_owned()))
899 .unwrap_or_default();
900 let start_part = match params.after_id {
901 Some(after) => match all_parts.iter().find(|part| part.id == after) {
905 Some(anchor) => all_parts
906 .iter()
907 .position(|part| part.ordinal > anchor.ordinal)
908 .unwrap_or(all_parts.len()),
909 None => return Ok(GetLookup::UnknownAfterId),
910 },
911 None => 0,
912 };
913 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
914 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
915 serde_json::to_string(part).map_or(0, |json| json.len())
916 });
917 let target_parts = remaining_parts[..part_count].to_vec();
918 let target_parts_remaining = remaining_parts.len() - part_count;
919
920 let target_row = &rows[target_pos];
921 let target = RetrievedMessage {
922 id: target_row.id.clone(),
923 role: target_row.role,
924 timestamp: target_row.timestamp,
925 text: target_row.text.clone(),
926 content: target_row.content.clone(),
927 parts: Vec::new(),
929 };
930 let siblings = window
931 .iter()
932 .enumerate()
933 .filter(|(idx, _)| start + idx != target_pos)
934 .map(|(_, row)| RetrievedMessage {
935 id: row.id.clone(),
936 role: row.role,
937 timestamp: row.timestamp,
938 text: row.text.clone(),
939 content: row.content.clone(),
940 parts: parts_by_message
941 .get(&(session_id.clone(), row.id.clone()))
942 .cloned()
943 .unwrap_or_default(),
944 })
945 .collect();
946
947 Ok(GetLookup::Found(MessagePage {
948 session,
949 target,
950 target_parts,
951 target_parts_remaining,
952 siblings,
953 }))
954 }
955
956 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
957 let batch = self
958 .handle
959 .scan_batch(
960 Table::Messages,
961 Some(&Predicate::Eq("session_id", session_id.into())),
962 &["id", "timestamp", "role", "search_text", "content"],
963 )
964 .await?;
965 let mut rows = Vec::with_capacity(batch.num_rows());
966 for row in 0..batch.num_rows() {
967 let id = string(&batch, "id", row)?.context("message id is null")?;
968 let role =
969 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
970 let timestamp = datetime(&batch, "timestamp", row)?;
971 rows.push(ScanRow {
972 id,
973 role,
974 timestamp,
975 text: string(&batch, "search_text", row)?,
976 content: string(&batch, "content", row)?,
977 });
978 }
979 Ok(rows)
980 }
981
982 pub async fn scan_conversational_messages(
986 &self,
987 session_id: &str,
988 ) -> Result<Vec<ConversationalRow>> {
989 let filter = Predicate::And(vec![
990 Predicate::Eq("session_id", session_id.into()),
991 Predicate::IsNotNull("search_text"),
992 ]);
993 let batch = self
994 .handle
995 .scan_batch(
996 Table::Messages,
997 Some(&filter),
998 &["id", "timestamp", "role", "search_text"],
999 )
1000 .await?;
1001
1002 let mut rows = Vec::with_capacity(batch.num_rows());
1003 for row in 0..batch.num_rows() {
1004 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1005 let role =
1006 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1007 let timestamp = datetime(&batch, "timestamp", row)?;
1008 let text_str = string(&batch, "search_text", row)?.context(
1009 "search_text null after IsNotNull pushdown - storage invariant violated",
1010 )?;
1011 rows.push(ConversationalRow {
1012 session_id: session_id.to_owned(),
1013 message_id,
1014 role,
1015 timestamp,
1016 text: SearchText(text_str),
1017 });
1018 }
1019 rows.sort_by(|a, b| {
1020 a.timestamp
1021 .cmp(&b.timestamp)
1022 .then_with(|| a.message_id.cmp(&b.message_id))
1023 });
1024 Ok(rows)
1025 }
1026
1027 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1030 let batch = self
1031 .handle
1032 .scan_batch(
1033 Table::Messages,
1034 Some(&Predicate::Eq("id", message_id.into())),
1035 &["session_id"],
1036 )
1037 .await?;
1038 if batch.num_rows() == 0 {
1039 return Ok(None);
1040 }
1041 string(&batch, "session_id", 0)
1042 }
1043
1044 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1045 self.handle.row_counts().await
1046 }
1047
1048 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1054 let scanner = self
1055 .handle
1056 .scan(
1057 Table::Messages,
1058 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1059 )
1060 .await?;
1061 let mut stream = scanner.try_into_stream().await?;
1062 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1063 while let Some(batch) = stream.next().await {
1064 let batch = batch?;
1065 for row in 0..batch.num_rows() {
1066 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1067 let project = string(&batch, "project", row)?.unwrap_or_default();
1068 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1069 let is_subagent = source_agent.contains('/');
1070 if is_subagent && !include_subagents {
1071 continue;
1072 }
1073 let entry = groups.entry((source_agent, project)).or_default();
1074 entry.messages += 1;
1075 entry.session_ids.insert(session_id);
1076 }
1077 }
1078
1079 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1080 let totals = RowTotals {
1081 sessions: totals_sessions as u64,
1082 messages: totals_messages as u64,
1083 parts: totals_parts as u64,
1084 };
1085
1086 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1087 for ((adapter, project), acc) in groups {
1088 by_adapter.entry(adapter).or_default().push(ProjectStats {
1089 project,
1090 sessions: acc.session_ids.len() as u64,
1091 messages: acc.messages,
1092 });
1093 }
1094
1095 let mut adapters = Vec::with_capacity(by_adapter.len());
1096 for (adapter, mut projects) in by_adapter {
1097 projects.sort_by(|a, b| {
1098 b.messages
1099 .cmp(&a.messages)
1100 .then_with(|| a.project.cmp(&b.project))
1101 });
1102 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1103 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1104 adapters.push(AdapterStats {
1105 adapter,
1106 sessions,
1107 messages,
1108 projects,
1109 });
1110 }
1111
1112 Ok(CorpusStats {
1113 data_url: self.handle.location().clone(),
1114 totals,
1115 adapters,
1116 include_subagents,
1117 })
1118 }
1119
1120 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1125 if rows.is_empty() {
1126 return Ok(());
1127 }
1128 let batch = embedding_update_batch(rows)?;
1129 self.handle
1130 .merge_update(Table::Messages, batch, rows.len())
1131 .await?;
1132 Ok(())
1133 }
1134
1135 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1138 try_stream! {
1139 let filter = Predicate::And(vec![
1140 Predicate::IsNull("vector"),
1141 Predicate::IsNotNull("search_text"),
1142 ]);
1143 let projection: &[&str] = &["session_id", "id", "search_text"];
1144 let scanner = self
1145 .handle
1146 .scan(
1147 Table::Messages,
1148 ScanOpts::with_predicate_and_projection(&filter, projection),
1149 )
1150 .await?;
1151 let mut batches = scanner
1152 .try_into_stream()
1153 .await
1154 .context("failed to open messages stream")?;
1155 while let Some(batch) = batches.next().await {
1156 let batch = batch?;
1157 for row in 0..batch.num_rows() {
1158 yield PendingMessage {
1159 session_id: string(&batch, "session_id", row)?
1160 .context("session_id is null")?,
1161 id: string(&batch, "id", row)?.context("message id is null")?,
1162 search_text: string(&batch, "search_text", row)?
1163 .context("search_text is null")?,
1164 };
1165 }
1166 }
1167 }
1168 }
1169
1170 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1175 try_stream! {
1176 let filter = Predicate::And(vec![
1177 Predicate::IsNotNull("search_text"),
1178 Predicate::Or(vec![
1179 Predicate::IsNull("vector"),
1180 Predicate::Ne("embedding_model", embed::model_id().into()),
1181 ]),
1182 ]);
1183 let projection: &[&str] = &["session_id", "id", "search_text"];
1184 let scanner = self
1185 .handle
1186 .scan(
1187 Table::Messages,
1188 ScanOpts::with_predicate_and_projection(&filter, projection),
1189 )
1190 .await?;
1191 let mut batches = scanner
1192 .try_into_stream()
1193 .await
1194 .context("failed to open pending-or-stale messages stream")?;
1195 while let Some(batch) = batches.next().await {
1196 let batch = batch?;
1197 for row in 0..batch.num_rows() {
1198 yield PendingMessage {
1199 session_id: string(&batch, "session_id", row)?
1200 .context("session_id is null")?,
1201 id: string(&batch, "id", row)?.context("message id is null")?,
1202 search_text: string(&batch, "search_text", row)?
1203 .context("search_text is null")?,
1204 };
1205 }
1206 }
1207 }
1208 }
1209
1210 pub async fn fts_search(
1212 &self,
1213 query: &str,
1214 limit: usize,
1215 filter: &Predicate,
1216 ) -> Result<Vec<(MessageKey, f32)>> {
1217 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1218 scanner.full_text_search(
1219 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1220 )?;
1221 scanner.disable_scoring_autoprojection();
1227 scanner.project(&["session_id", "id", "_score"])?;
1228 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1229 let batch = scanner.try_into_batch().await?;
1230 let mut hits = Vec::with_capacity(batch.num_rows());
1231 for row in 0..batch.num_rows() {
1232 let key = MessageKey {
1233 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1234 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1235 };
1236 hits.push((key, float32(&batch, "_score", row)?));
1237 }
1238 hits.sort_by(|left, right| {
1246 right
1247 .1
1248 .partial_cmp(&left.1)
1249 .unwrap_or(std::cmp::Ordering::Equal)
1250 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1251 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1252 });
1253 Ok(hits)
1254 }
1255
1256 pub async fn has_embeddings(&self) -> Result<bool> {
1261 let scope = Predicate::IsNotNull("vector");
1262 let mut scanner = self
1263 .handle
1264 .scan(
1265 Table::Messages,
1266 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1267 )
1268 .await?;
1269 scanner.limit(Some(1), None)?;
1270 let batch = scanner.try_into_batch().await?;
1271 Ok(batch.num_rows() > 0)
1272 }
1273
1274 pub async fn vector_search(
1282 &self,
1283 query: &[f32],
1284 limit: usize,
1285 filter: &Predicate,
1286 search: Option<&config::SearchConfig>,
1287 ) -> Result<Vec<(MessageKey, f32)>> {
1288 let scope = embedded_scope(filter);
1289 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1290 let key = Float32Array::from(query.to_vec());
1291 scanner.nearest("vector", &key, limit)?;
1292 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1293 scanner.nprobes(nprobes);
1294 }
1295 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1296 scanner.refine(refine_factor);
1297 }
1298 scanner.disable_scoring_autoprojection();
1302 scanner.project(&["session_id", "id", "_distance"])?;
1303 let batch = scanner.try_into_batch().await?;
1304 let mut hits = Vec::with_capacity(batch.num_rows());
1305 for row in 0..batch.num_rows() {
1306 let key = MessageKey {
1307 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1308 message_id: string(&batch, "id", row)?.context("message id is null")?,
1309 };
1310 hits.push((key, float32(&batch, "_distance", row)?));
1311 }
1312 hits.sort_by(|left, right| {
1318 left.1
1319 .partial_cmp(&right.1)
1320 .unwrap_or(std::cmp::Ordering::Equal)
1321 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1322 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1323 });
1324 Ok(hits)
1325 }
1326
1327 pub async fn explain_vector_plan(
1330 &self,
1331 query: &[f32],
1332 limit: usize,
1333 filter: &Predicate,
1334 search: Option<&config::SearchConfig>,
1335 ) -> Result<String> {
1336 let scope = embedded_scope(filter);
1337 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1338 let key = Float32Array::from(query.to_vec());
1339 scanner.nearest("vector", &key, limit)?;
1340 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1341 scanner.nprobes(nprobes);
1342 }
1343 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1344 scanner.refine(refine_factor);
1345 }
1346 scanner
1347 .explain_plan(true)
1348 .await
1349 .context("explain_plan failed")
1350 }
1351
1352 pub async fn explain_fts_plan(
1353 &self,
1354 query: &str,
1355 limit: usize,
1356 filter: &Predicate,
1357 ) -> Result<String> {
1358 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1359 scanner.full_text_search(
1360 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1361 )?;
1362 scanner.project(&["session_id", "id"])?;
1363 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1364 scanner
1365 .explain_plan(true)
1366 .await
1367 .context("explain_plan failed")
1368 }
1369
1370 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1372 if keys.is_empty() {
1373 return Ok(Vec::new());
1374 }
1375 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1376 let session_ids = keys
1377 .iter()
1378 .map(|key| key.session_id.clone())
1379 .collect::<Vec<_>>();
1380 let message_ids = keys
1381 .iter()
1382 .map(|key| key.message_id.clone())
1383 .collect::<Vec<_>>();
1384 let predicate = Predicate::And(vec![
1385 in_predicate("session_id", &session_ids),
1386 in_predicate("id", &message_ids),
1387 ]);
1388 let batch = self
1389 .handle
1390 .scan_batch(
1391 Table::Messages,
1392 Some(&predicate),
1393 &[
1394 "id",
1395 "session_id",
1396 "role",
1397 "project",
1398 "source_agent",
1399 "timestamp",
1400 "search_text",
1401 ],
1402 )
1403 .await?;
1404 let mut metas = Vec::with_capacity(batch.num_rows());
1405 for row in 0..batch.num_rows() {
1406 let message_id = string(&batch, "id", row)?.context("id is null")?;
1407 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1408 if !wanted.contains(&MessageKey {
1409 session_id: session_id.clone(),
1410 message_id: message_id.clone(),
1411 }) {
1412 continue;
1413 }
1414 metas.push(MessageMeta {
1415 message_id,
1416 session_id,
1417 role: string(&batch, "role", row)?.context("role is null")?,
1418 project: string(&batch, "project", row)?.context("project is null")?,
1419 source_agent: string(&batch, "source_agent", row)?
1420 .context("source_agent is null")?,
1421 timestamp: datetime(&batch, "timestamp", row)?,
1422 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1423 });
1424 }
1425 Ok(metas)
1426 }
1427
1428 pub async fn session_message_counts(
1430 &self,
1431 session_ids: &[String],
1432 ) -> Result<BTreeMap<String, usize>> {
1433 if session_ids.is_empty() {
1434 return Ok(BTreeMap::new());
1435 }
1436 let dataset = self.handle.dataset(Table::Messages).await?;
1437 let mut tasks = tokio::task::JoinSet::new();
1438 for session_id in session_ids {
1439 let dataset = dataset.clone();
1440 let session_id = session_id.clone();
1441 tasks.spawn(async move {
1442 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1443 let count = dataset.count_rows(Some(filter)).await?;
1444 anyhow::Ok((session_id, count))
1445 });
1446 }
1447 let mut counts = BTreeMap::new();
1448 while let Some(joined) = tasks.join_next().await {
1449 let (session_id, count) = joined.context("session count task panicked")??;
1450 counts.insert(session_id, count);
1451 }
1452 Ok(counts)
1453 }
1454
1455 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1458 self.handle
1459 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1460 .await
1461 }
1462
1463 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1469 self.handle
1470 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1471 .await
1472 }
1473
1474 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1481 let dataset = self.handle.dataset(Table::Messages).await?;
1482 let embedded = dataset
1483 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1484 .await?;
1485 let total = dataset
1486 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1487 .await?;
1488 Ok(EmbeddingProgress {
1489 embedded,
1490 total,
1491 model: embed::model_id(),
1492 })
1493 }
1494
1495 pub async fn stale_embedding_count(&self) -> Result<usize> {
1499 let dataset = self.handle.dataset(Table::Messages).await?;
1500 dataset
1501 .count_rows(Some(
1502 Predicate::And(vec![
1503 Predicate::IsNotNull("vector"),
1504 Predicate::Ne("embedding_model", embed::model_id().into()),
1505 ])
1506 .to_lance(),
1507 ))
1508 .await
1509 .map_err(Into::into)
1510 }
1511
1512 pub async fn optimize_indices(
1518 &self,
1519 progress: Option<OptimizeProgressFn>,
1520 cleanup: Option<CleanupConfig>,
1521 ) -> Result<OptimizeOutcome> {
1522 let cleanup = cleanup.unwrap_or_default();
1523 let policy = pond_index_intents();
1524 let mut tables = Vec::with_capacity(3);
1525 for (table, intents) in policy.all() {
1526 let outcome = self
1527 .handle
1528 .optimize_table(table, intents, progress.as_ref(), cleanup)
1529 .await;
1530 tables.push(outcome);
1531 }
1532 Ok(OptimizeOutcome { tables })
1533 }
1534
1535 pub async fn build_indices_only(
1541 &self,
1542 progress: Option<OptimizeProgressFn>,
1543 ) -> Result<OptimizeOutcome> {
1544 let policy = pond_index_intents();
1545 let mut tables = Vec::with_capacity(3);
1546 for (table, intents) in policy.all() {
1547 let indices = self
1548 .handle
1549 .optimize_table_indices_only(table, intents, progress.as_ref())
1550 .await;
1551 tables.push(TableOptimizeOutcome {
1552 table,
1553 indices,
1554 compaction: PhaseOutcome::NotAttempted,
1555 });
1556 }
1557 Ok(OptimizeOutcome { tables })
1558 }
1559
1560 #[cfg(test)]
1561 async fn optimize_indices_with_vector_threshold(
1562 &self,
1563 vector_threshold: usize,
1564 ) -> Result<OptimizeOutcome> {
1565 let cleanup = CleanupConfig::default();
1566 let policy = pond_index_intents_with_vector_threshold(vector_threshold);
1567 let mut tables = Vec::with_capacity(3);
1568 for (table, intents) in policy.all() {
1569 let outcome = self
1570 .handle
1571 .optimize_table(table, intents, None, cleanup)
1572 .await;
1573 tables.push(outcome);
1574 }
1575 Ok(OptimizeOutcome { tables })
1576 }
1577
1578 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1579 let policy = pond_index_intents();
1580 let mut matched = false;
1581 for (table, intents) in policy.all() {
1582 for intent in intents {
1583 if intent_name.is_none_or(|name| name == intent.name) {
1584 matched = true;
1585 self.handle.rebuild_index(table, intent).await?;
1586 }
1587 }
1588 }
1589 if let Some(name) = intent_name
1590 && !matched
1591 {
1592 anyhow::bail!("unknown index intent {name:?}");
1593 }
1594 Ok(())
1595 }
1596
1597 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1598 let policy = pond_index_intents();
1599 let mut statuses = Vec::new();
1600 for (table, intents) in policy.all() {
1601 statuses.extend(self.handle.index_status(table, intents).await?);
1602 }
1603 Ok(statuses)
1604 }
1605
1606 pub async fn drop_vector_index(&self) -> Result<()> {
1610 match self
1611 .handle
1612 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1613 .await
1614 {
1615 Ok(()) => Ok(()),
1616 Err(error) => {
1617 let msg = error.to_string();
1618 if msg.contains("not found") || msg.contains("does not exist") {
1619 Ok(())
1620 } else {
1621 Err(error)
1622 }
1623 }
1624 }
1625 }
1626
1627 pub async fn table_sizes(&self) -> Result<TableSizes> {
1630 self.handle.table_sizes().await
1631 }
1632
1633 pub async fn text_script_histogram(&self, max_messages: usize) -> Result<Vec<(String, usize)>> {
1640 use std::collections::HashMap;
1641 let filter = Predicate::IsNotNull("search_text");
1642 let projection: &[&str] = &["search_text"];
1643 let scanner = self
1644 .handle
1645 .scan(
1646 Table::Messages,
1647 ScanOpts::with_predicate_and_projection(&filter, projection),
1648 )
1649 .await?;
1650 let mut batches = scanner
1651 .try_into_stream()
1652 .await
1653 .context("failed to open messages stream for script histogram")?;
1654 let mut counts: HashMap<&'static str, usize> = HashMap::new();
1655 let mut sampled = 0usize;
1656 'outer: while let Some(batch) = batches.next().await {
1657 let batch = batch?;
1658 for row in 0..batch.num_rows() {
1659 if sampled >= max_messages {
1660 break 'outer;
1661 }
1662 if let Some(text) = string(&batch, "search_text", row)? {
1663 for ch in text.chars() {
1664 if let Some(class) = classify_script(ch) {
1665 *counts.entry(class).or_default() += 1;
1666 }
1667 }
1668 sampled += 1;
1669 }
1670 }
1671 }
1672 let mut histogram: Vec<(String, usize)> = counts
1673 .into_iter()
1674 .map(|(name, count)| (name.to_owned(), count))
1675 .collect();
1676 histogram.sort_by(|left, right| right.1.cmp(&left.1).then(left.0.cmp(&right.0)));
1677 Ok(histogram)
1678 }
1679
1680 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1681 let batch = self
1682 .handle
1683 .scan_batch(
1684 Table::Sessions,
1685 Some(&Predicate::Eq("id", session_id.into())),
1686 &[],
1687 )
1688 .await?;
1689 if batch.num_rows() == 0 {
1690 Ok(None)
1691 } else {
1692 Ok(Some(session_from_batch(&batch, 0)?))
1693 }
1694 }
1695
1696 pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1702 let batch = self
1703 .handle
1704 .scan_batch(
1705 Table::Messages,
1706 Some(&Predicate::Eq("id", message_id.into())),
1707 &["vector"],
1708 )
1709 .await?;
1710 if batch.num_rows() == 0 {
1711 return Ok(None);
1712 }
1713 let column = batch
1714 .column(0)
1715 .as_any()
1716 .downcast_ref::<FixedSizeListArray>();
1717 let Some(list) = column else {
1718 return Ok(None);
1719 };
1720 if list.is_null(0) {
1721 return Ok(None);
1722 }
1723 let values = list.value(0);
1724 let halves = values
1725 .as_any()
1726 .downcast_ref::<Float16Array>()
1727 .context("messages.vector inner array is not Float16")?;
1728 let widened = (0..halves.len())
1729 .map(|i| halves.value(i).to_f32())
1730 .collect();
1731 Ok(Some(widened))
1732 }
1733
1734 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1735 let batch = self
1736 .handle
1737 .scan_batch(
1738 Table::Messages,
1739 Some(&Predicate::Eq("session_id", session_id.into())),
1740 &[
1741 "session_id",
1742 "id",
1743 "timestamp",
1744 "role",
1745 "content",
1746 "options",
1747 ],
1748 )
1749 .await?;
1750 let mut messages = Vec::with_capacity(batch.num_rows());
1751 for row in 0..batch.num_rows() {
1752 messages.push(message_from_batch(&batch, row)?);
1753 }
1754 messages.sort_by(|left, right| {
1755 left.timestamp()
1756 .cmp(&right.timestamp())
1757 .then_with(|| left.id().cmp(right.id()))
1758 });
1759
1760 let message_ids = messages
1761 .iter()
1762 .map(|message| message.id().to_owned())
1763 .collect::<Vec<_>>();
1764 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1765
1766 Ok(messages
1767 .into_iter()
1768 .map(|message| {
1769 let key = (message.session_id().to_owned(), message.id().to_owned());
1770 let parts = parts_by_message.remove(&key).unwrap_or_default();
1771 MessageWithParts { message, parts }
1772 })
1773 .collect())
1774 }
1775
1776 pub async fn parts_for_messages(
1780 &self,
1781 session_id: &str,
1782 message_ids: &[String],
1783 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1784 self.scan_parts(session_id, message_ids, None).await
1785 }
1786
1787 pub async fn summary_parts_for_messages(
1792 &self,
1793 session_id: &str,
1794 message_ids: &[String],
1795 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1796 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1797 .await
1798 }
1799
1800 async fn scan_parts(
1801 &self,
1802 session_id: &str,
1803 message_ids: &[String],
1804 part_types: Option<&[&str]>,
1805 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1806 if message_ids.is_empty() {
1807 return Ok(BTreeMap::new());
1808 }
1809 let mut clauses = vec![
1810 Predicate::Eq("session_id", session_id.into()),
1811 in_predicate("message_id", message_ids),
1812 ];
1813 if let Some(types) = part_types {
1814 clauses.push(Predicate::In(
1815 "type",
1816 types.iter().map(|&t| t.into()).collect(),
1817 ));
1818 }
1819 let predicate = Predicate::And(clauses);
1820 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1821 let mut scanner = self
1822 .handle
1823 .scan(
1824 Table::Parts,
1825 ScanOpts::with_predicate_and_projection(
1826 &predicate,
1827 &[
1828 "session_id",
1829 "message_id",
1830 "id",
1831 "ordinal",
1832 "type",
1833 "provenance",
1834 "variant_data",
1835 "options",
1836 ],
1837 ),
1838 )
1839 .await?;
1840 scanner.with_row_address();
1841 let batch = scanner.try_into_batch().await.context("scan failed")?;
1842 let row_addresses = uint64(&batch, "_rowaddr")?;
1843 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1844 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1845 for row in 0..batch.num_rows() {
1846 if string(&batch, "type", row)?.as_deref() == Some("file") {
1847 let variant_data =
1848 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1849 file_rows.push((row, row_addresses.value(row), variant_data));
1850 }
1851 }
1852 if !file_rows.is_empty() {
1853 let addresses = file_rows
1854 .iter()
1855 .map(|(_, address, _)| *address)
1856 .collect::<Vec<_>>();
1857 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1858 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1859 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1863 file_payloads.insert(row, payload);
1864 }
1865 }
1866 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1867 for row in 0..batch.num_rows() {
1868 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1869 parts_by_message
1870 .entry((part.session_id.clone(), part.message_id.clone()))
1871 .or_default()
1872 .push(part);
1873 }
1874 for parts in parts_by_message.values_mut() {
1875 parts.sort_by_key(|part| part.ordinal);
1876 }
1877 Ok(parts_by_message)
1878 }
1879}
1880
1881#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1882#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1883pub enum IngestEvent {
1884 Session(Session),
1885 Message(Message),
1886 Part(Part),
1887}
1888
1889#[derive(Debug, Clone, PartialEq, Eq, Default)]
1897pub struct IngestSummary {
1898 pub inserted: usize,
1900 pub matched: usize,
1902 pub dropped_events: usize,
1912 pub dropped_sessions: usize,
1917 pub skipped_files: usize,
1920 pub skipped_empty: usize,
1925 pub skipped_fresh: usize,
1929 pub storage_errors: usize,
1933 pub truncated_values: usize,
1936 pub drop_reasons: BTreeMap<&'static str, usize>,
1942}
1943
1944pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1950pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1951pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1952pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1953pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1954pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1955pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1956pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1957pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1958pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1959pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1960
1961impl IngestSummary {
1962 pub fn accepted(&self) -> usize {
1963 self.inserted + self.matched
1964 }
1965
1966 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
1967 for outcome in outcomes {
1968 match outcome.status {
1969 OutcomeStatus::Inserted => self.inserted += 1,
1970 OutcomeStatus::Matched => self.matched += 1,
1971 OutcomeStatus::Error => {
1972 if outcome.kind == "session" {
1978 self.dropped_sessions += 1;
1979 } else {
1980 self.dropped_events += 1;
1981 }
1982 let reason = outcome
1983 .error
1984 .as_ref()
1985 .and_then(|e| e.reason_key)
1986 .unwrap_or(DROP_REASON_UNCATEGORIZED);
1987 *self.drop_reasons.entry(reason).or_insert(0) += 1;
1988 }
1989 }
1990 }
1991 }
1992}
1993
1994#[derive(Debug, Clone, PartialEq)]
1999pub struct RowOutcome {
2000 pub index: usize,
2001 pub kind: &'static str,
2002 pub pk: Value,
2003 pub status: OutcomeStatus,
2004 pub error: Option<RowError>,
2005}
2006
2007#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2008pub enum OutcomeStatus {
2009 Inserted,
2010 Matched,
2011 Error,
2012}
2013
2014#[derive(Debug, Clone, PartialEq, Eq)]
2017pub struct RowError {
2018 pub message: String,
2019 pub field: Option<&'static str>,
2020 pub reason: Option<&'static str>,
2021 pub reason_key: Option<&'static str>,
2026}
2027
2028#[derive(Debug)]
2032struct BufferedSession {
2033 index: usize,
2034 session: Session,
2035}
2036
2037#[derive(Debug)]
2038struct BufferedMessage {
2039 index: usize,
2040 message: Message,
2041 parts: Vec<BufferedPart>,
2042 search_text: Option<String>,
2043}
2044
2045#[derive(Debug)]
2046struct BufferedPart {
2047 index: usize,
2048 part: Part,
2049}
2050
2051#[derive(Debug, Default)]
2068pub struct IngestValidator {
2069 session: Option<BufferedSession>,
2070 current_message: Option<BufferedMessage>,
2071 current_parts: Vec<BufferedPart>,
2072 messages: Vec<BufferedMessage>,
2073 seen_message_ids: HashSet<String>,
2077 seen_part_keys: HashSet<(String, String)>,
2080 completed: Vec<CompletedSubstream>,
2084}
2085
2086#[derive(Debug)]
2088struct CompletedSubstream {
2089 session_index: usize,
2090 session: Session,
2091 messages: Vec<BufferedMessage>,
2092}
2093
2094impl IngestValidator {
2095 pub async fn push(
2101 &mut self,
2102 store: &Store,
2103 index: usize,
2104 event: IngestEvent,
2105 ) -> Result<Vec<RowOutcome>> {
2106 match event {
2107 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2108 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2109 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2110 }
2111 }
2112
2113 pub async fn finish(&mut self, store: &Store) -> Result<Vec<RowOutcome>> {
2116 self.close_current_substream();
2117 self.flush(store).await
2118 }
2119
2120 pub async fn flush(&mut self, store: &Store) -> Result<Vec<RowOutcome>> {
2127 if self.completed.is_empty() {
2128 return Ok(Vec::new());
2129 }
2130 let completed = std::mem::take(&mut self.completed);
2131 store.upsert_session_batch(completed).await
2132 }
2133
2134 pub fn pending_substreams(&self) -> usize {
2137 self.completed.len()
2138 }
2139
2140 async fn push_session(
2141 &mut self,
2142 _store: &Store,
2143 index: usize,
2144 mut session: Session,
2145 ) -> Result<Vec<RowOutcome>> {
2146 self.close_current_substream();
2150
2151 let trimmed = session.source_agent.trim();
2156 if trimmed.is_empty() {
2157 return Ok(vec![RowOutcome {
2158 index,
2159 kind: "session",
2160 pk: Value::String(session.id.clone()),
2161 status: OutcomeStatus::Error,
2162 error: Some(RowError {
2163 message: format!("session {} has empty source_agent after trim", session.id),
2164 field: Some("source_agent"),
2165 reason: None,
2166 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2167 }),
2168 }]);
2169 }
2170 if trimmed.len() != session.source_agent.len() {
2171 session.source_agent = trimmed.to_owned();
2172 }
2173
2174 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2175 return Ok(vec![RowOutcome {
2176 index,
2177 kind: "session",
2178 pk: Value::String(session.id.clone()),
2179 status: OutcomeStatus::Error,
2180 error: Some(RowError {
2181 message: format!(
2182 "session {} has parent_message_id without parent_session_id",
2183 session.id,
2184 ),
2185 field: Some("parent_message_id"),
2186 reason: None,
2187 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2188 }),
2189 }]);
2190 }
2191
2192 self.seen_message_ids.clear();
2193 self.seen_part_keys.clear();
2194 self.session = Some(BufferedSession { index, session });
2195 Ok(Vec::new())
2196 }
2197
2198 fn close_current_substream(&mut self) {
2199 self.flush_current_message();
2200 let Some(BufferedSession {
2201 index: session_index,
2202 session,
2203 }) = self.session.take()
2204 else {
2205 return;
2206 };
2207 let messages = std::mem::take(&mut self.messages);
2208 self.seen_message_ids.clear();
2209 self.seen_part_keys.clear();
2210 self.completed.push(CompletedSubstream {
2211 session_index,
2212 session,
2213 messages,
2214 });
2215 }
2216
2217 fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2218 let pk = Value::Array(vec![
2219 Value::String(message.session_id().to_owned()),
2220 Value::String(message.id().to_owned()),
2221 ]);
2222 let Some(session) = &self.session else {
2223 return vec![error_outcome(
2224 index,
2225 "message",
2226 pk,
2227 "first event in a session stream must be Session",
2228 None,
2229 DROP_REASON_MESSAGE_BEFORE_SESSION,
2230 )];
2231 };
2232 if message.session_id() != session.session.id {
2233 let msg = format!(
2234 "message {} references session {}, expected {}",
2235 message.id(),
2236 message.session_id(),
2237 session.session.id
2238 );
2239 return vec![error_outcome(
2240 index,
2241 "message",
2242 pk,
2243 &msg,
2244 Some("session_id"),
2245 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2246 )];
2247 }
2248 if !self.seen_message_ids.insert(message.id().to_owned()) {
2249 let msg = format!("duplicate message id {} in session substream", message.id());
2253 return vec![error_outcome(
2254 index,
2255 "message",
2256 pk,
2257 &msg,
2258 None,
2259 DROP_REASON_DUPLICATE_MESSAGE_ID,
2260 )];
2261 }
2262 self.flush_current_message();
2263 self.current_message = Some(BufferedMessage {
2264 index,
2265 message,
2266 parts: Vec::new(),
2267 search_text: None,
2268 });
2269 Vec::new()
2270 }
2271
2272 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2273 let pk = Value::Array(vec![
2274 Value::String(part.session_id.clone()),
2275 Value::String(part.message_id.clone()),
2276 Value::String(part.id.clone()),
2277 ]);
2278 let Some(current) = &self.current_message else {
2279 return vec![error_outcome(
2280 index,
2281 "part",
2282 pk,
2283 "part event appeared before a message",
2284 None,
2285 DROP_REASON_PART_BEFORE_MESSAGE,
2286 )];
2287 };
2288 if part.session_id != current.message.session_id() {
2289 let msg = format!(
2290 "part {} references session {}, expected {}",
2291 part.id,
2292 part.session_id,
2293 current.message.session_id()
2294 );
2295 return vec![error_outcome(
2296 index,
2297 "part",
2298 pk,
2299 &msg,
2300 Some("session_id"),
2301 DROP_REASON_PART_MESSAGE_MISMATCH,
2302 )];
2303 }
2304 if part.message_id != current.message.id() {
2305 let msg = format!(
2306 "part {} references message {}, expected {}",
2307 part.id,
2308 part.message_id,
2309 current.message.id()
2310 );
2311 return vec![error_outcome(
2312 index,
2313 "part",
2314 pk,
2315 &msg,
2316 Some("message_id"),
2317 DROP_REASON_PART_MESSAGE_MISMATCH,
2318 )];
2319 }
2320 let part_key = (part.message_id.clone(), part.id.clone());
2321 if !self.seen_part_keys.insert(part_key) {
2322 let msg = format!(
2323 "duplicate part id {} for message {} in session substream",
2324 part.id, part.message_id
2325 );
2326 return vec![error_outcome(
2327 index,
2328 "part",
2329 pk,
2330 &msg,
2331 None,
2332 DROP_REASON_DUPLICATE_PART_KEY,
2333 )];
2334 }
2335 self.current_parts.push(BufferedPart { index, part });
2336 Vec::new()
2337 }
2338
2339 fn flush_current_message(&mut self) {
2340 let Some(mut buffered) = self.current_message.take() else {
2341 return;
2342 };
2343 let parts = std::mem::take(&mut self.current_parts);
2344 let mut canonical_parts = Vec::with_capacity(parts.len());
2345 for part in &parts {
2346 canonical_parts.push(part.part.clone());
2347 }
2348 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2349 buffered.parts = parts;
2350 self.messages.push(buffered);
2351 }
2352}
2353
2354fn error_outcome(
2355 index: usize,
2356 kind: &'static str,
2357 pk: Value,
2358 message: &str,
2359 field: Option<&'static str>,
2360 reason_key: &'static str,
2361) -> RowOutcome {
2362 RowOutcome {
2363 index,
2364 kind,
2365 pk,
2366 status: OutcomeStatus::Error,
2367 error: Some(RowError {
2368 message: message.to_owned(),
2369 field,
2370 reason: None,
2371 reason_key: Some(reason_key),
2372 }),
2373 }
2374}
2375
2376fn error_outcomes_for_substream(
2381 session_index: usize,
2382 session: &Session,
2383 _messages: &[BufferedMessage],
2384 message: impl Into<String>,
2385 field: Option<&'static str>,
2386 reason_key: &'static str,
2387) -> Vec<RowOutcome> {
2388 let reason = field.map(|_| "immutable");
2389 vec![RowOutcome {
2390 index: session_index,
2391 kind: "session",
2392 pk: Value::String(session.id.clone()),
2393 status: OutcomeStatus::Error,
2394 error: Some(RowError {
2395 message: message.into(),
2396 field,
2397 reason,
2398 reason_key: Some(reason_key),
2399 }),
2400 }]
2401}
2402
2403fn success_outcomes_for_substream(
2407 session_index: usize,
2408 session: &Session,
2409 messages: &[BufferedMessage],
2410 status: UpsertStatus,
2411) -> Vec<RowOutcome> {
2412 let mut outcomes = Vec::with_capacity(1 + messages.len());
2413 outcomes.push(success_outcome(
2414 session_index,
2415 "session",
2416 Value::String(session.id.clone()),
2417 status,
2418 ));
2419 for buffered in messages {
2420 let pk = Value::Array(vec![
2421 Value::String(buffered.message.session_id().to_owned()),
2422 Value::String(buffered.message.id().to_owned()),
2423 ]);
2424 outcomes.push(success_outcome(buffered.index, "message", pk, status));
2425 for part in &buffered.parts {
2426 let part_pk = Value::Array(vec![
2427 Value::String(part.part.session_id.clone()),
2428 Value::String(part.part.message_id.clone()),
2429 Value::String(part.part.id.clone()),
2430 ]);
2431 outcomes.push(success_outcome(part.index, "part", part_pk, status));
2432 }
2433 }
2434 outcomes
2435}
2436
2437fn success_outcome(
2438 index: usize,
2439 kind: &'static str,
2440 pk: Value,
2441 status: UpsertStatus,
2442) -> RowOutcome {
2443 let status = match status {
2444 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2445 UpsertStatus::Matched => OutcomeStatus::Matched,
2446 };
2447 RowOutcome {
2448 index,
2449 kind,
2450 pk,
2451 status,
2452 error: None,
2453 }
2454}
2455
2456#[derive(Debug, Clone, PartialEq, Eq)]
2457enum IngestError {
2458 ImmutableField {
2463 field: &'static str,
2464 session_id: String,
2465 stored: String,
2466 attempted: String,
2467 },
2468}
2469
2470impl std::fmt::Display for IngestError {
2471 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2472 match self {
2473 Self::ImmutableField {
2474 field,
2475 session_id,
2476 stored,
2477 attempted,
2478 } => write!(
2479 formatter,
2480 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2481 ),
2482 }
2483 }
2484}
2485
2486impl std::error::Error for IngestError {}
2487
2488fn ensure_immutable_match(
2492 existing: &Session,
2493 incoming: &Session,
2494) -> std::result::Result<(), IngestError> {
2495 if existing.source_agent != incoming.source_agent {
2496 return Err(IngestError::ImmutableField {
2497 field: "source_agent",
2498 session_id: incoming.id.clone(),
2499 stored: existing.source_agent.clone(),
2500 attempted: incoming.source_agent.clone(),
2501 });
2502 }
2503 if existing.project != incoming.project {
2504 return Err(IngestError::ImmutableField {
2505 field: "project",
2506 session_id: incoming.id.clone(),
2507 stored: (*existing.project).clone(),
2508 attempted: (*incoming.project).clone(),
2509 });
2510 }
2511 Ok(())
2512}
2513
2514pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2515 use crate::wire::Provenance;
2516 let mut chunks: Vec<String> = Vec::new();
2517 for part in parts {
2518 if part.provenance != Provenance::Conversational {
2521 continue;
2522 }
2523 match (message.role(), &part.kind) {
2524 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2525 if let Some(text) = text {
2526 chunks.push(text.to_string());
2527 }
2528 }
2529 (
2530 Role::User | Role::Assistant,
2531 PartKind::File {
2532 media_type,
2533 file_name,
2534 data,
2535 },
2536 ) => {
2537 if let Some(file_name) = file_name {
2538 chunks.push(file_name.clone());
2539 }
2540 chunks.push(media_type.clone());
2541 if let FileData::Url(uri) = data {
2542 chunks.push(uri.clone());
2543 }
2544 }
2545 (
2546 Role::System | Role::Tool,
2547 PartKind::Text { .. }
2548 | PartKind::Reasoning { .. }
2549 | PartKind::File { .. }
2550 | PartKind::ToolCall { .. }
2551 | PartKind::ToolResult { .. }
2552 | PartKind::ToolApprovalRequest { .. }
2553 | PartKind::ToolApprovalResponse { .. },
2554 )
2555 | (
2556 Role::User | Role::Assistant,
2557 PartKind::Reasoning { .. }
2558 | PartKind::ToolCall { .. }
2559 | PartKind::ToolResult { .. }
2560 | PartKind::ToolApprovalRequest { .. }
2561 | PartKind::ToolApprovalResponse { .. },
2562 ) => {}
2563 }
2564 }
2565
2566 let text = chunks
2567 .into_iter()
2568 .filter(|chunk| !chunk.trim().is_empty())
2569 .collect::<Vec<_>>()
2570 .join("\n");
2571 if text.is_empty() { None } else { Some(text) }
2572}
2573
2574#[derive(Debug, Clone, PartialEq, Eq)]
2576pub struct SearchText(String);
2577
2578impl SearchText {
2579 pub fn as_str(&self) -> &str {
2580 &self.0
2581 }
2582
2583 pub fn into_inner(self) -> String {
2584 self.0
2585 }
2586}
2587
2588impl AsRef<str> for SearchText {
2589 fn as_ref(&self) -> &str {
2590 &self.0
2591 }
2592}
2593
2594#[derive(Debug, Clone, PartialEq)]
2595pub struct MessageWithParts {
2596 pub message: Message,
2597 pub parts: Vec<Part>,
2598}
2599
2600#[derive(Debug, Clone, PartialEq)]
2601pub struct SessionWithMessages {
2602 pub session: Session,
2603 pub messages: Vec<MessageWithParts>,
2604}
2605
2606#[derive(Debug, Clone)]
2607pub struct SessionViewParams<'a> {
2608 pub mode: ResponseMode,
2609 pub after_id: Option<&'a str>,
2610 pub limit: usize,
2611 pub budget_bytes: usize,
2612}
2613
2614#[derive(Debug, Clone)]
2615pub struct MessageViewParams<'a> {
2616 pub context_depth: usize,
2617 pub after_id: Option<&'a str>,
2618 pub limit: usize,
2619 pub budget_bytes: usize,
2620}
2621
2622#[derive(Debug, Clone, PartialEq)]
2628pub enum GetLookup<T> {
2629 NotFound,
2630 UnknownAfterId,
2631 Found(T),
2632}
2633
2634#[derive(Debug, Clone, PartialEq)]
2638pub struct SessionPage {
2639 pub session: Session,
2640 pub messages: Vec<RetrievedMessage>,
2641 pub messages_remaining: usize,
2642}
2643
2644#[derive(Debug, Clone, PartialEq)]
2648pub struct MessagePage {
2649 pub session: Session,
2650 pub target: RetrievedMessage,
2651 pub target_parts: Vec<Part>,
2652 pub target_parts_remaining: usize,
2653 pub siblings: Vec<RetrievedMessage>,
2654}
2655
2656#[derive(Debug, Clone, PartialEq)]
2657pub struct RetrievedMessage {
2658 pub id: String,
2659 pub role: Role,
2660 pub timestamp: DateTime<Utc>,
2661 pub text: Option<String>,
2662 pub content: Option<String>,
2663 pub parts: Vec<Part>,
2664}
2665
2666#[derive(Debug, Clone)]
2667struct ScanRow {
2668 id: String,
2669 role: Role,
2670 timestamp: DateTime<Utc>,
2671 text: Option<String>,
2672 content: Option<String>,
2673}
2674
2675#[derive(Debug, Clone)]
2678pub struct ConversationalRow {
2679 pub session_id: String,
2680 pub message_id: String,
2681 pub role: Role,
2682 pub timestamp: DateTime<Utc>,
2683 pub text: SearchText,
2684}
2685
2686fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2691 let capped = items.len().min(limit.clamp(1, 1000));
2692 let mut acc = 0usize;
2693 let mut emitted = 0usize;
2694 for item in &items[..capped] {
2695 let next = acc.saturating_add(size(item));
2696 if emitted > 0 && next > budget_bytes {
2697 break;
2698 }
2699 acc = next;
2700 emitted += 1;
2701 }
2702 emitted
2703}
2704
2705fn role_from_str(value: &str) -> Result<Role> {
2706 match value {
2707 "system" => Ok(Role::System),
2708 "user" => Ok(Role::User),
2709 "assistant" => Ok(Role::Assistant),
2710 "tool" => Ok(Role::Tool),
2711 other => anyhow::bail!("unknown message role {other}"),
2712 }
2713}
2714
2715const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2723 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2724 (
2725 "session_id",
2726 BuiltinIndexType::BTree,
2727 "messages_session_id_btree",
2728 ),
2729 (
2730 "timestamp",
2731 BuiltinIndexType::BTree,
2732 "messages_timestamp_btree",
2733 ),
2734 (
2735 "source_agent",
2736 BuiltinIndexType::Bitmap,
2737 "messages_source_agent_bitmap",
2738 ),
2739 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2740];
2741
2742const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2745 (
2746 "session_id",
2747 BuiltinIndexType::BTree,
2748 "parts_session_id_btree",
2749 ),
2750 (
2751 "message_id",
2752 BuiltinIndexType::BTree,
2753 "parts_message_id_btree",
2754 ),
2755];
2756
2757const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2760 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2761
2762fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2763 Predicate::In(
2764 column,
2765 values.iter().cloned().map(ScalarValue::String).collect(),
2766 )
2767}
2768
2769fn embedded_scope(filter: &Predicate) -> Predicate {
2774 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
2775}
2776
2777fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
2778 let inserted = usize::try_from(inserted_rows)
2779 .unwrap_or(usize::MAX)
2780 .min(total);
2781 let mut statuses = Vec::with_capacity(total);
2782 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
2783 statuses.extend(std::iter::repeat_n(
2784 UpsertStatus::Matched,
2785 total.saturating_sub(inserted),
2786 ));
2787 statuses
2788}
2789
2790pub(crate) const SESSIONS: &str = "sessions";
2794pub(crate) const MESSAGES: &str = "messages";
2795pub(crate) const PARTS: &str = "parts";
2796
2797pub(crate) const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
2800
2801pub(crate) const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
2804
2805const IVF_PQ_NUM_BITS: u8 = 8;
2811const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
2812const IVF_PQ_MAX_ITERS: usize = 15;
2813
2814const FTS_NGRAM_MIN: u32 = 3;
2818const FTS_NGRAM_MAX: u32 = 5;
2819
2820pub fn pond_index_intents() -> IndexIntents {
2823 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
2824}
2825
2826pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
2830 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
2831 messages.push(IndexIntent {
2832 name: MESSAGES_FTS_INDEX,
2833 column: "search_text",
2834 trigger: IndexTrigger::OnAnyRows,
2835 params: IndexParamsKind::InvertedFtsNgram {
2836 min: FTS_NGRAM_MIN,
2837 max: FTS_NGRAM_MAX,
2838 },
2839 });
2840 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
2841 messages.push(IndexIntent {
2842 name,
2843 column,
2844 trigger: IndexTrigger::OnAnyRows,
2845 params: IndexParamsKind::Scalar(kind.clone()),
2846 });
2847 }
2848 messages.push(IndexIntent {
2849 name: MESSAGES_VECTOR_INDEX,
2850 column: "vector",
2851 trigger: IndexTrigger::OnNonNullCount {
2852 column: "vector",
2853 threshold: vector_threshold,
2854 },
2855 params: IndexParamsKind::IvfPqCosine {
2856 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
2857 num_bits: IVF_PQ_NUM_BITS,
2858 max_iters: IVF_PQ_MAX_ITERS,
2859 },
2860 });
2861 let parts = PARTS_SCALAR_INDICES
2862 .iter()
2863 .map(|(column, kind, name)| IndexIntent {
2864 name,
2865 column,
2866 trigger: IndexTrigger::OnAnyRows,
2867 params: IndexParamsKind::Scalar(kind.clone()),
2868 })
2869 .collect();
2870 let sessions = SESSIONS_SCALAR_INDICES
2871 .iter()
2872 .map(|(column, kind, name)| IndexIntent {
2873 name,
2874 column,
2875 trigger: IndexTrigger::OnAnyRows,
2876 params: IndexParamsKind::Scalar(kind.clone()),
2877 })
2878 .collect();
2879 IndexIntents {
2880 sessions,
2881 messages,
2882 parts,
2883 }
2884}
2885
2886pub const DEFAULT_EMBEDDING_DIM: usize = 384;
2890
2891static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
2897
2898pub fn embedding_dim() -> usize {
2901 EMBEDDING_DIM_RUNTIME
2902 .get()
2903 .copied()
2904 .unwrap_or(DEFAULT_EMBEDDING_DIM)
2905}
2906
2907pub fn init_embedding_dim(dim: usize) {
2909 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
2910}
2911
2912pub(crate) fn write_params_for_create() -> WriteParams {
2919 WriteParams {
2920 data_storage_version: Some(LanceFileVersion::V2_1),
2921 enable_v2_manifest_paths: true,
2922 enable_stable_row_ids: true,
2923 auto_cleanup: Some(AutoCleanupParams {
2924 interval: 20,
2925 older_than: chrono::TimeDelta::days(1),
2926 }),
2927 skip_auto_cleanup: true,
2928 ..WriteParams::default()
2929 }
2930}
2931
2932fn export_schema(table: Table) -> Arc<Schema> {
2933 match table {
2934 Table::Sessions => session_schema(),
2935 Table::Messages => message_schema(),
2936 Table::Parts => part_schema(),
2937 }
2938}
2939
2940fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
2941 let expected = export_schema(table);
2942 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
2943 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
2944 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
2945 if actual_names != expected_names {
2946 anyhow::bail!(
2947 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
2948 table.as_str(),
2949 );
2950 }
2951 Ok(())
2952}
2953
2954async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
2955 let source_uri = source
2956 .to_str()
2957 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
2958 let dataset = Dataset::open(source_uri)
2959 .await
2960 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
2961 ensure_schema_matches_archive(&dataset, table)?;
2962 Ok(dataset)
2963}
2964
2965pub(crate) fn session_schema() -> Arc<Schema> {
2966 Arc::new(Schema::new(vec![
2967 primary_field("id", DataType::Utf8, false),
2968 Field::new("parent_session_id", DataType::Utf8, true),
2969 Field::new("parent_message_id", DataType::Utf8, true),
2970 Field::new("source_agent", DataType::Utf8, false),
2971 Field::new(
2972 "created_at",
2973 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
2974 false,
2975 ),
2976 Field::new("project", DataType::Utf8, false),
2977 json_field("options", false),
2978 ]))
2979}
2980
2981pub(crate) fn message_schema() -> Arc<Schema> {
2982 Arc::new(Schema::new(vec![
2983 primary_field("session_id", DataType::Utf8, false),
2984 primary_field("id", DataType::Utf8, false),
2985 Field::new(
2986 "timestamp",
2987 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
2988 false,
2989 ),
2990 Field::new("role", DataType::Utf8, false),
2991 Field::new("source_agent", DataType::Utf8, false),
2992 Field::new("project", DataType::Utf8, false),
2993 Field::new("content", DataType::Utf8, true),
2994 Field::new("search_text", DataType::Utf8, true),
2995 Field::new("vector", embedding_vector_type(), true),
2998 Field::new("embedding_model", DataType::Utf8, true),
2999 json_field("options", false),
3000 ]))
3001}
3002
3003pub(crate) fn part_schema() -> Arc<Schema> {
3004 Arc::new(Schema::new(vec![
3005 primary_field("session_id", DataType::Utf8, false),
3006 primary_field("message_id", DataType::Utf8, false),
3007 primary_field("id", DataType::Utf8, false),
3008 Field::new("ordinal", DataType::Int32, false),
3009 Field::new("type", DataType::Utf8, false),
3010 Field::new("provenance", DataType::Utf8, false),
3013 json_field("variant_data", false),
3014 legacy_blob_field("data", true),
3015 json_field("options", false),
3016 ]))
3017}
3018
3019pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3020 let arrays = schema
3021 .fields()
3022 .iter()
3023 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3024 .collect();
3025 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3026}
3027
3028pub(crate) fn empty_reader(
3029 schema: Arc<Schema>,
3030) -> Result<
3031 RecordBatchIterator<
3032 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3033 >,
3034> {
3035 let batch = empty_batch(schema.clone())?;
3036 Ok(RecordBatchIterator::new(
3037 vec![Ok(batch)].into_iter(),
3038 schema,
3039 ))
3040}
3041
3042pub(crate) struct MessageBatchRow<'a> {
3043 pub message: &'a Message,
3044 pub source_agent: &'a str,
3045 pub project: &'a str,
3046 pub search_text: Option<&'a str>,
3047}
3048
3049fn embedding_vector_type() -> DataType {
3055 DataType::FixedSizeList(
3056 Arc::new(Field::new("item", DataType::Float16, true)),
3057 embedding_dim() as i32,
3058 )
3059}
3060
3061fn embedding_update_schema() -> Arc<Schema> {
3065 Arc::new(Schema::new(vec![
3066 primary_field("session_id", DataType::Utf8, false),
3067 primary_field("id", DataType::Utf8, false),
3068 Field::new("vector", embedding_vector_type(), true),
3069 Field::new("embedding_model", DataType::Utf8, true),
3070 ]))
3071}
3072
3073pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3076 let dim = embedding_dim();
3077 let mut flat = Vec::with_capacity(rows.len() * dim);
3078 for row in rows {
3079 if row.vector.len() != dim {
3080 anyhow::bail!(
3081 "embedding for message {} has dim {}, expected {dim}",
3082 row.id,
3083 row.vector.len(),
3084 );
3085 }
3086 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3087 }
3088 let values = Float16Array::from(flat);
3089 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3090 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3091 .context("failed to build embedding vector column")?;
3092
3093 RecordBatch::try_new(
3094 embedding_update_schema(),
3095 vec![
3096 Arc::new(StringArray::from(
3097 rows.iter()
3098 .map(|row| row.session_id.as_str())
3099 .collect::<Vec<_>>(),
3100 )),
3101 Arc::new(StringArray::from(
3102 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3103 )),
3104 Arc::new(vectors),
3105 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3106 ],
3107 )
3108 .context("failed to build embedding update batch")
3109}
3110
3111const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3116
3117fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3122 let mut chunks = Vec::new();
3123 let mut start = 0usize;
3124 let mut running = 0usize;
3125 for (index, &row) in cells.iter().enumerate() {
3126 if running + row > COLUMN_BYTE_BUDGET && index > start {
3127 chunks.push(start..index);
3128 start = index;
3129 running = 0;
3130 }
3131 running += row;
3132 }
3133 if start < cells.len() {
3134 chunks.push(start..cells.len());
3135 }
3136 chunks
3137}
3138
3139fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3140 if bytes >= COLUMN_BYTE_BUDGET {
3141 anyhow::bail!(
3142 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3143 overflow Arrow's i32 offset buffer"
3144 );
3145 }
3146 Ok(())
3147}
3148
3149async fn merge_insert_chunks(
3150 handle: &Handle,
3151 table: Table,
3152 batches: Vec<RecordBatch>,
3153) -> Result<u64> {
3154 let mut inserted = 0u64;
3155 for batch in batches {
3156 let rows = batch.num_rows();
3157 inserted += handle.merge_insert(table, batch, rows).await?;
3158 }
3159 Ok(inserted)
3160}
3161
3162pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3163 let options = sessions
3164 .iter()
3165 .map(|session| json_bytes(&session.options))
3166 .collect::<Result<Vec<_>>>()?;
3167 let mut cells = Vec::with_capacity(sessions.len());
3168 for (session, encoded) in sessions.iter().zip(&options) {
3169 let columns = [
3170 session.id.len(),
3171 session.parent_session_id.as_deref().map_or(0, str::len),
3172 session.parent_message_id.as_deref().map_or(0, str::len),
3173 session.source_agent.len(),
3174 session.project.as_str().len(),
3175 encoded.len(),
3176 ];
3177 for bytes in columns {
3178 guard_cell("sessions", &session.id, bytes)?;
3179 }
3180 cells.push(columns.iter().sum());
3181 }
3182 chunk_ranges(&cells)
3183 .into_iter()
3184 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3185 .collect()
3186}
3187
3188fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3189 let schema = session_schema();
3190 RecordBatch::try_new(
3191 schema.clone(),
3192 vec![
3193 Arc::new(StringArray::from(
3194 sessions
3195 .iter()
3196 .map(|session| session.id.as_str())
3197 .collect::<Vec<_>>(),
3198 )),
3199 Arc::new(StringArray::from(
3200 sessions
3201 .iter()
3202 .map(|session| session.parent_session_id.as_deref())
3203 .collect::<Vec<_>>(),
3204 )),
3205 Arc::new(StringArray::from(
3206 sessions
3207 .iter()
3208 .map(|session| session.parent_message_id.as_deref())
3209 .collect::<Vec<_>>(),
3210 )),
3211 Arc::new(StringArray::from(
3212 sessions
3213 .iter()
3214 .map(|session| session.source_agent.as_str())
3215 .collect::<Vec<_>>(),
3216 )),
3217 Arc::new(
3218 TimestampMicrosecondArray::from(
3219 sessions
3220 .iter()
3221 .map(|session| micros(session.created_at))
3222 .collect::<Vec<_>>(),
3223 )
3224 .with_timezone("UTC"),
3225 ),
3226 Arc::new(StringArray::from(
3227 sessions
3228 .iter()
3229 .map(|session| session.project.as_str())
3230 .collect::<Vec<_>>(),
3231 )),
3232 Arc::new(LargeBinaryArray::from_iter_values(
3233 options.iter().map(Vec::as_slice),
3234 )),
3235 ],
3236 )
3237 .context("failed to build session batch")
3238}
3239
3240pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3241 let options = rows
3242 .iter()
3243 .map(|row| json_bytes(row.message.options()))
3244 .collect::<Result<Vec<_>>>()?;
3245 let mut cells = Vec::with_capacity(rows.len());
3246 for (row, encoded) in rows.iter().zip(&options) {
3247 let columns = [
3248 row.message.session_id().len(),
3249 row.message.id().len(),
3250 row.message.role().as_str().len(),
3251 row.source_agent.len(),
3252 row.project.len(),
3253 row.message.system_content().map_or(0, str::len),
3254 row.search_text.map_or(0, str::len),
3255 encoded.len(),
3256 ];
3257 for bytes in columns {
3258 guard_cell("messages", row.message.id(), bytes)?;
3259 }
3260 cells.push(columns.iter().sum());
3261 }
3262 chunk_ranges(&cells)
3263 .into_iter()
3264 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3265 .collect()
3266}
3267
3268fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3269 let schema = message_schema();
3270 RecordBatch::try_new(
3271 schema.clone(),
3272 vec![
3273 Arc::new(StringArray::from(
3274 rows.iter()
3275 .map(|row| row.message.session_id())
3276 .collect::<Vec<_>>(),
3277 )),
3278 Arc::new(StringArray::from(
3279 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3280 )),
3281 Arc::new(
3282 TimestampMicrosecondArray::from(
3283 rows.iter()
3284 .map(|row| micros(row.message.timestamp()))
3285 .collect::<Vec<_>>(),
3286 )
3287 .with_timezone("UTC"),
3288 ),
3289 Arc::new(StringArray::from(
3290 rows.iter()
3291 .map(|row| row.message.role().as_str())
3292 .collect::<Vec<_>>(),
3293 )),
3294 Arc::new(StringArray::from(
3295 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3296 )),
3297 Arc::new(StringArray::from(
3298 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3299 )),
3300 Arc::new(StringArray::from(
3301 rows.iter()
3302 .map(|row| row.message.system_content())
3303 .collect::<Vec<_>>(),
3304 )),
3305 Arc::new(StringArray::from(
3306 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3307 )),
3308 new_null_array(&embedding_vector_type(), rows.len()),
3312 new_null_array(&DataType::Utf8, rows.len()),
3313 Arc::new(LargeBinaryArray::from_iter_values(
3314 options.iter().map(Vec::as_slice),
3315 )),
3316 ],
3317 )
3318 .context("failed to build message batch")
3319}
3320
3321pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3322 let variant_data = parts
3323 .iter()
3324 .map(|part| part_variant_json(&part.kind))
3325 .collect::<Result<Vec<_>>>()?;
3326 let options = parts
3327 .iter()
3328 .map(|part| json_bytes(&part.options))
3329 .collect::<Result<Vec<_>>>()?;
3330 let mut cells = Vec::with_capacity(parts.len());
3331 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3334 let columns = [
3335 part.session_id.len(),
3336 part.message_id.len(),
3337 part.id.len(),
3338 part.kind.type_name().len(),
3339 part.provenance.as_str().len(),
3340 variant.len(),
3341 encoded.len(),
3342 ];
3343 for bytes in columns {
3344 guard_cell("parts", &part.id, bytes)?;
3345 }
3346 cells.push(columns.iter().sum());
3347 }
3348 chunk_ranges(&cells)
3349 .into_iter()
3350 .map(|range| {
3351 parts_chunk(
3352 &parts[range.clone()],
3353 &variant_data[range.clone()],
3354 &options[range],
3355 )
3356 })
3357 .collect()
3358}
3359
3360fn parts_chunk(
3361 parts: &[Part],
3362 variant_data: &[Vec<u8>],
3363 options: &[Vec<u8>],
3364) -> Result<RecordBatch> {
3365 let schema = part_schema();
3366 let blob_payloads: Vec<Option<&[u8]>> = parts
3370 .iter()
3371 .map(|part| match &part.kind {
3372 PartKind::File { data, .. } => Some(match data {
3373 FileData::String(value) => value.as_bytes(),
3374 FileData::Bytes(value) => value.as_slice(),
3375 FileData::Url(value) => value.as_bytes(),
3376 }),
3377 PartKind::Text { .. }
3378 | PartKind::Reasoning { .. }
3379 | PartKind::ToolCall { .. }
3380 | PartKind::ToolResult { .. }
3381 | PartKind::ToolApprovalRequest { .. }
3382 | PartKind::ToolApprovalResponse { .. } => None,
3383 })
3384 .collect();
3385 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3386
3387 RecordBatch::try_new(
3388 schema.clone(),
3389 vec![
3390 Arc::new(StringArray::from(
3391 parts
3392 .iter()
3393 .map(|part| part.session_id.as_str())
3394 .collect::<Vec<_>>(),
3395 )),
3396 Arc::new(StringArray::from(
3397 parts
3398 .iter()
3399 .map(|part| part.message_id.as_str())
3400 .collect::<Vec<_>>(),
3401 )),
3402 Arc::new(StringArray::from(
3403 parts
3404 .iter()
3405 .map(|part| part.id.as_str())
3406 .collect::<Vec<_>>(),
3407 )),
3408 Arc::new(Int32Array::from(
3409 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3410 )),
3411 Arc::new(StringArray::from(
3412 parts
3413 .iter()
3414 .map(|part| part.kind.type_name())
3415 .collect::<Vec<_>>(),
3416 )),
3417 Arc::new(StringArray::from(
3418 parts
3419 .iter()
3420 .map(|part| part.provenance.as_str())
3421 .collect::<Vec<_>>(),
3422 )),
3423 Arc::new(LargeBinaryArray::from_iter_values(
3424 variant_data.iter().map(Vec::as_slice),
3425 )),
3426 Arc::new(blob_array),
3427 Arc::new(LargeBinaryArray::from_iter_values(
3428 options.iter().map(Vec::as_slice),
3429 )),
3430 ],
3431 )
3432 .context("failed to build parts batch")
3433}
3434
3435pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3436 Ok(Session {
3437 id: string(batch, "id", row)?.context("session id is null")?,
3438 parent_session_id: string(batch, "parent_session_id", row)?,
3439 parent_message_id: string(batch, "parent_message_id", row)?,
3440 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3441 created_at: datetime(batch, "created_at", row)?,
3442 project: crate::adapter::Extracted::from_stored(
3443 string(batch, "project", row)?.context("project is null")?,
3444 ),
3445 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3446 })
3447}
3448
3449pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3450 let id = string(batch, "id", row)?.context("message id is null")?;
3451 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3452 let timestamp = datetime(batch, "timestamp", row)?;
3453 let options =
3454 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3455
3456 match string(batch, "role", row)?
3457 .context("message role is null")?
3458 .as_str()
3459 {
3460 "system" => Ok(Message::System {
3461 id,
3462 session_id,
3463 timestamp,
3464 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3471 options,
3472 }),
3473 "user" => Ok(Message::User {
3474 id,
3475 session_id,
3476 timestamp,
3477 options,
3478 }),
3479 "assistant" => Ok(Message::Assistant {
3480 id,
3481 session_id,
3482 timestamp,
3483 options,
3484 }),
3485 "tool" => Ok(Message::Tool {
3486 id,
3487 session_id,
3488 timestamp,
3489 options,
3490 }),
3491 other => anyhow::bail!("unknown message role {other}"),
3492 }
3493}
3494
3495pub(crate) fn part_from_batch(
3496 batch: &RecordBatch,
3497 row: usize,
3498 file_data: Option<FileData>,
3499) -> Result<Part> {
3500 let type_name = string(batch, "type", row)?.context("part type is null")?;
3501 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3502 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3503 Ok(Part {
3504 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3505 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3506 id: string(batch, "id", row)?.context("part id is null")?,
3507 ordinal: int32(batch, "ordinal", row)?,
3508 provenance: provenance_from_str(&provenance)?,
3509 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3510 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3511 })
3512}
3513
3514fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3515 match value {
3516 "conversational" => Ok(crate::wire::Provenance::Conversational),
3517 "injected" => Ok(crate::wire::Provenance::Injected),
3518 other => anyhow::bail!("unknown part provenance {other}"),
3519 }
3520}
3521
3522fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3523 let kind = file_data_kind(variant_data)?;
3524 match kind.as_str() {
3525 "string" => {
3526 let text = std::str::from_utf8(bytes)
3527 .context("file string payload is not UTF-8")?
3528 .to_owned();
3529 Ok(FileData::String(text))
3530 }
3531 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3532 "url" => Ok(FileData::Url(
3533 std::str::from_utf8(bytes)
3534 .context("file URL payload is not UTF-8")?
3535 .to_owned(),
3536 )),
3537 other => anyhow::bail!("unknown file data_kind {other}"),
3538 }
3539}
3540
3541fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3542 let value = json_parse::<Value>(variant_data)?;
3543 value
3544 .get("data_kind")
3545 .and_then(Value::as_str)
3546 .map(str::to_owned)
3547 .context("file part variant_data missing data_kind")
3548}
3549
3550fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3551 batch
3552 .column_by_name(name)
3553 .with_context(|| format!("missing column {name}"))?
3554 .as_any()
3555 .downcast_ref::<UInt64Array>()
3556 .with_context(|| format!("column {name} is not UInt64"))
3557}
3558
3559fn classify_script(ch: char) -> Option<&'static str> {
3566 if !ch.is_alphabetic() {
3567 return None;
3568 }
3569 let code = ch as u32;
3570 match code {
3571 0x0041..=0x005A | 0x0061..=0x007A | 0x00C0..=0x024F => Some("Latin"),
3572 0x0370..=0x03FF => Some("Greek"),
3573 0x0400..=0x052F => Some("Cyrillic"),
3574 0x0590..=0x05FF => Some("Hebrew"),
3575 0x0600..=0x06FF | 0x0750..=0x077F => Some("Arabic"),
3576 0x0900..=0x097F => Some("Devanagari"),
3577 0x0E00..=0x0E7F => Some("Thai"),
3578 0x3040..=0x309F => Some("Hiragana"),
3579 0x30A0..=0x30FF => Some("Katakana"),
3580 0x4E00..=0x9FFF | 0x3400..=0x4DBF => Some("Han"),
3581 0xAC00..=0xD7AF | 0x1100..=0x11FF => Some("Hangul"),
3582 _ => Some("Other"),
3583 }
3584}
3585
3586pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3587 let array = batch
3588 .column_by_name(name)
3589 .with_context(|| format!("missing column {name}"))?
3590 .as_any()
3591 .downcast_ref::<StringArray>()
3592 .with_context(|| format!("column {name} is not Utf8"))?;
3593 if array.is_null(row) {
3594 Ok(None)
3595 } else {
3596 Ok(Some(array.value(row).to_owned()))
3597 }
3598}
3599
3600fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3601 let column = batch
3605 .column_by_name(name)
3606 .with_context(|| format!("missing column {name}"))?;
3607 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3608 return if array.is_null(row) {
3609 Ok(None)
3610 } else {
3611 Ok(Some(
3612 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3613 ))
3614 };
3615 }
3616 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3617 return if array.is_null(row) {
3618 Ok(None)
3619 } else {
3620 Ok(Some(array.value(row).as_bytes().to_vec()))
3621 };
3622 }
3623 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3624 return if array.is_null(row) {
3625 Ok(None)
3626 } else {
3627 Ok(Some(array.value(row).as_bytes().to_vec()))
3628 };
3629 }
3630 anyhow::bail!("column {name} is not a JSON-compatible array")
3631}
3632
3633fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3634 let array = batch
3635 .column_by_name(name)
3636 .with_context(|| format!("missing column {name}"))?
3637 .as_any()
3638 .downcast_ref::<Int32Array>()
3639 .with_context(|| format!("column {name} is not Int32"))?;
3640 Ok(array.value(row))
3641}
3642
3643pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3644 let array = batch
3645 .column_by_name(name)
3646 .with_context(|| format!("missing column {name}"))?
3647 .as_any()
3648 .downcast_ref::<Float32Array>()
3649 .with_context(|| format!("column {name} is not Float32"))?;
3650 Ok(array.value(row))
3651}
3652
3653pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3654 let array = batch
3655 .column_by_name(name)
3656 .with_context(|| format!("missing column {name}"))?
3657 .as_any()
3658 .downcast_ref::<TimestampMicrosecondArray>()
3659 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3660 Utc.timestamp_micros(array.value(row))
3661 .single()
3662 .context("timestamp is out of range")
3663}
3664
3665fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3666 Field::new(name, data_type, nullable).with_metadata(
3667 [(
3668 "lance-schema:unenforced-primary-key".to_owned(),
3669 "true".to_owned(),
3670 )]
3671 .into(),
3672 )
3673}
3674
3675fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3685 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3686 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3687 .into_iter()
3688 .collect(),
3689 )
3690}
3691
3692fn json_field(name: &str, nullable: bool) -> Field {
3693 lance_arrow::json::json_field(name, nullable)
3694}
3695
3696fn micros(timestamp: DateTime<Utc>) -> i64 {
3697 timestamp.timestamp_micros()
3698}
3699
3700fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3701 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3709 lance_arrow::json::encode_json(&text)
3710 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3711}
3712
3713fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3714 serde_json::from_slice(value).context("failed to parse JSON field")
3715}
3716
3717fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3718 if let PartKind::File {
3719 media_type,
3720 file_name,
3721 data,
3722 } = kind
3723 {
3724 let data_kind = match data {
3725 FileData::String(_) => "string",
3726 FileData::Bytes(_) => "bytes",
3727 FileData::Url(_) => "url",
3728 };
3729 return json_bytes(&serde_json::json!({
3730 "media_type": media_type,
3731 "file_name": file_name,
3732 "data_kind": data_kind,
3733 }));
3734 }
3735 let value = serde_json::to_value(kind)?;
3736 let mut object = value
3737 .as_object()
3738 .cloned()
3739 .context("part variant did not serialize to an object")?;
3740 object.remove("type");
3741 json_bytes(&object)
3742}
3743
3744fn part_kind_from_json(
3745 type_name: &str,
3746 variant_data: &[u8],
3747 file_data: Option<FileData>,
3748) -> Result<PartKind> {
3749 let mut value = json_parse::<Value>(variant_data)?;
3750 let object = value
3751 .as_object_mut()
3752 .context("part variant data is not an object")?;
3753 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3754 if let Some(data) = file_data {
3755 object.remove("data_kind");
3756 object.insert("data".to_owned(), serde_json::to_value(data)?);
3757 }
3758 serde_json::from_value(value).context("failed to parse part kind")
3759}
3760
3761#[cfg(test)]
3762mod tests {
3763 #![allow(clippy::expect_used, clippy::unwrap_used)]
3764
3765 use super::*;
3766 use crate::{
3767 adapter::Extracted,
3768 handlers::ingest_events,
3769 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3770 };
3771 use chrono::Utc;
3772 use serde_json::json;
3773 use tempfile::TempDir;
3774
3775 fn synthetic_session(id: &str) -> Session {
3776 Session {
3777 id: id.to_owned(),
3778 parent_session_id: None,
3779 parent_message_id: None,
3780 source_agent: "claude-code".to_owned(),
3781 created_at: Utc::now(),
3782 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3783 options: ProviderOptions::new(),
3784 }
3785 }
3786
3787 #[test]
3788 fn search_text_excludes_injected_parts() {
3789 use crate::wire::Provenance;
3790 let message = Message::User {
3791 id: "m1".to_owned(),
3792 session_id: "s1".to_owned(),
3793 timestamp: Utc::now(),
3794 options: ProviderOptions::new(),
3795 };
3796 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
3797 session_id: "s1".to_owned(),
3798 id: id.to_owned(),
3799 message_id: "m1".to_owned(),
3800 ordinal: 0,
3801 provenance,
3802 options: ProviderOptions::new(),
3803 kind: PartKind::Text {
3804 text: Some(Extracted::from_test_value(text.to_owned())),
3805 },
3806 };
3807
3808 let conversational = search_text(
3811 &message,
3812 &[text_part(
3813 "p1",
3814 "real human prompt",
3815 Provenance::Conversational,
3816 )],
3817 );
3818 assert_eq!(conversational.as_deref(), Some("real human prompt"));
3819
3820 let injected = search_text(
3821 &message,
3822 &[text_part(
3823 "p2",
3824 "<task-notification>...</task-notification>",
3825 Provenance::Injected,
3826 )],
3827 );
3828 assert!(
3829 injected.is_none(),
3830 "a message whose only part is injected has null search_text"
3831 );
3832 }
3833
3834 #[test]
3835 fn chunk_ranges_splits_on_byte_budget() {
3836 assert!(chunk_ranges(&[]).is_empty());
3837 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
3838
3839 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
3840 assert_eq!(
3841 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
3842 vec![0..1, 1..2, 2..3],
3843 );
3844
3845 assert_eq!(
3847 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
3848 vec![0..1, 1..2, 2..3],
3849 );
3850 }
3851
3852 #[tokio::test]
3853 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
3854 let temp = TempDir::new()?;
3859 let store = Store::open_local(temp.path()).await?;
3860 let session = synthetic_session("ordering");
3861 let orphan_part = Part {
3862 session_id: session.id.clone(),
3863 id: "orphan-part".to_owned(),
3864 message_id: "missing-message".to_owned(),
3865 ordinal: 0,
3866 provenance: crate::wire::Provenance::Conversational,
3867 options: ProviderOptions::new(),
3868 kind: PartKind::Text {
3869 text: Some(Extracted::from_test_value("orphan".to_owned())),
3870 },
3871 };
3872 let valid_message = Message::User {
3873 id: "valid-message".to_owned(),
3874 session_id: session.id.clone(),
3875 timestamp: Utc::now(),
3876 options: ProviderOptions::new(),
3877 };
3878 let valid_part = Part {
3879 session_id: session.id.clone(),
3880 id: "valid-part".to_owned(),
3881 message_id: valid_message.id().to_owned(),
3882 ordinal: 0,
3883 provenance: crate::wire::Provenance::Conversational,
3884 options: ProviderOptions::new(),
3885 kind: PartKind::Text {
3886 text: Some(Extracted::from_test_value("kept".to_owned())),
3887 },
3888 };
3889
3890 let mut validator = IngestValidator::default();
3891 validator
3892 .push(&store, 0, IngestEvent::Session(session.clone()))
3893 .await?;
3894 let part_outcomes = validator
3895 .push(&store, 1, IngestEvent::Part(orphan_part))
3896 .await?;
3897 assert_eq!(part_outcomes.len(), 1);
3898 assert_eq!(part_outcomes[0].kind, "part");
3899 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
3900 assert!(
3901 part_outcomes[0]
3902 .error
3903 .as_ref()
3904 .map(|e| e.message.contains("part event appeared before a message"))
3905 .unwrap_or(false),
3906 "error message must explain the ordering violation: {part_outcomes:?}"
3907 );
3908 validator
3909 .push(&store, 2, IngestEvent::Message(valid_message))
3910 .await?;
3911 validator
3912 .push(&store, 3, IngestEvent::Part(valid_part))
3913 .await?;
3914 validator.finish(&store).await?;
3915
3916 let (sessions, messages, parts) = store.row_counts().await?;
3917 assert_eq!(sessions, 1, "session committed despite the orphan part");
3918 assert_eq!(messages, 1, "valid message committed");
3919 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
3920
3921 Ok(())
3922 }
3923
3924 #[tokio::test]
3925 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
3926 let temp = TempDir::new()?;
3930 let store = Store::open_local(temp.path()).await?;
3931 let session = synthetic_session("duplicate-message");
3932 let first = Message::User {
3933 id: "message-1".to_owned(),
3934 session_id: session.id.clone(),
3935 timestamp: Utc::now(),
3936 options: ProviderOptions::new(),
3937 };
3938 let second = Message::Assistant {
3939 id: "message-1".to_owned(),
3940 session_id: session.id.clone(),
3941 timestamp: Utc::now(),
3942 options: ProviderOptions::new(),
3943 };
3944
3945 let mut validator = IngestValidator::default();
3946 validator
3947 .push(&store, 0, IngestEvent::Session(session.clone()))
3948 .await?;
3949 validator
3950 .push(&store, 1, IngestEvent::Message(first))
3951 .await?;
3952 let dup_outcomes = validator
3953 .push(&store, 2, IngestEvent::Message(second))
3954 .await?;
3955 assert_eq!(dup_outcomes.len(), 1);
3956 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
3957 assert!(
3958 dup_outcomes[0]
3959 .error
3960 .as_ref()
3961 .map(|e| e.message.contains("duplicate message id message-1"))
3962 .unwrap_or(false),
3963 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
3964 );
3965
3966 validator.finish(&store).await?;
3967 let (sessions, messages, _) = store.row_counts().await?;
3968 assert_eq!(sessions, 1, "session committed");
3969 assert_eq!(messages, 1, "only the first message committed");
3970
3971 Ok(())
3972 }
3973
3974 #[tokio::test(flavor = "multi_thread")]
3982 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
3983 use crate::wire::{FileData, PartKind, Provenance};
3984 let temp = TempDir::new()?;
3985 let store = Store::open_local(temp.path()).await?;
3986
3987 let session = synthetic_session("compact-blob");
3988 store
3989 .upsert_sessions(std::slice::from_ref(&session))
3990 .await?;
3991
3992 let make_part = |idx: usize, kind: PartKind| Part {
3993 session_id: session.id.clone(),
3994 message_id: format!("msg-{idx}"),
3995 id: format!("part-{idx}"),
3996 ordinal: 0,
3997 provenance: Provenance::Conversational,
3998 options: ProviderOptions::new(),
3999 kind,
4000 };
4001
4002 let batch_a = vec![
4003 make_part(
4004 0,
4005 PartKind::File {
4006 media_type: "text/plain".to_owned(),
4007 file_name: Some("a.txt".to_owned()),
4008 data: FileData::Bytes(b"alpha".to_vec()),
4009 },
4010 ),
4011 make_part(
4012 1,
4013 PartKind::File {
4014 media_type: "text/plain".to_owned(),
4015 file_name: Some("b.txt".to_owned()),
4016 data: FileData::String("beta".to_owned()),
4017 },
4018 ),
4019 ];
4020 store.upsert_parts(&batch_a).await?;
4021
4022 let batch_b = vec![
4023 make_part(
4024 2,
4025 PartKind::File {
4026 media_type: "application/octet-stream".to_owned(),
4027 file_name: None,
4028 data: FileData::Url("https://example.com/file".to_owned()),
4029 },
4030 ),
4031 make_part(
4032 3,
4033 PartKind::File {
4034 media_type: "image/png".to_owned(),
4035 file_name: Some("c.png".to_owned()),
4036 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4037 },
4038 ),
4039 ];
4040 store.upsert_parts(&batch_b).await?;
4041
4042 store.optimize_indices(None, None).await?.into_result()?;
4043
4044 Ok(())
4045 }
4046
4047 #[tokio::test]
4048 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4049 let temp = TempDir::new()?;
4050 let store = Store::open_local(temp.path()).await?;
4051 let session = synthetic_session("blob");
4052 let message = Message::User {
4053 id: "message-1".to_owned(),
4054 session_id: session.id.clone(),
4055 timestamp: Utc::now(),
4056 options: ProviderOptions::new(),
4057 };
4058 let part = Part {
4059 session_id: session.id.clone(),
4060 id: "part-1".to_owned(),
4061 message_id: message.id().to_owned(),
4062 ordinal: 0,
4063 provenance: crate::wire::Provenance::Conversational,
4064 options: ProviderOptions::new(),
4065 kind: PartKind::File {
4066 media_type: "text/plain".to_owned(),
4067 file_name: Some("payload.txt".to_owned()),
4068 data: FileData::Bytes(b"pond".to_vec()),
4069 },
4070 };
4071
4072 let mut validator = IngestValidator::default();
4073 validator
4074 .push(&store, 0, IngestEvent::Session(session.clone()))
4075 .await?;
4076 validator
4077 .push(&store, 1, IngestEvent::Message(message.clone()))
4078 .await?;
4079 validator
4080 .push(&store, 2, IngestEvent::Part(part.clone()))
4081 .await?;
4082 validator.finish(&store).await?;
4083
4084 let stored = store
4085 .get_session(&session.id)
4086 .await?
4087 .expect("session should exist");
4088 let stored_part = &stored.messages[0].parts[0];
4089 assert_eq!(stored_part, &part);
4090
4091 Ok(())
4092 }
4093
4094 fn base_session() -> Session {
4105 Session {
4106 id: "01HXY00000000001".to_owned(),
4107 parent_session_id: None,
4108 parent_message_id: None,
4109 source_agent: "claude-code".to_owned(),
4110 created_at: Utc::now(),
4111 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4112 options: ProviderOptions::new(),
4113 }
4114 }
4115
4116 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4117 outcomes
4118 .iter()
4119 .filter(|outcome| outcome.status == target)
4120 .count()
4121 }
4122
4123 #[tokio::test(flavor = "multi_thread")]
4124 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4125 -> anyhow::Result<()> {
4126 let temp = TempDir::new()?;
4127 let store = Store::open_local(temp.path()).await?;
4128
4129 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4130 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4131
4132 let mut again = base_session();
4133 again.options.insert("title".to_owned(), json!("renamed"));
4134 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4135 assert_eq!(
4136 count_status(&second, OutcomeStatus::Error),
4137 0,
4138 "options is mutable; the re-ingest must not surface an error: {second:?}",
4139 );
4140 assert_eq!(
4141 count_status(&second, OutcomeStatus::Matched),
4142 1,
4143 "unchanged immutable fields must match-insert via merge_insert",
4144 );
4145
4146 Ok(())
4147 }
4148
4149 #[tokio::test(flavor = "multi_thread")]
4150 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4151 let temp = TempDir::new()?;
4152 let store = Store::open_local(temp.path()).await?;
4153
4154 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4155 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4156
4157 let mut tampered = base_session();
4158 tampered.source_agent = "codex-cli".to_owned();
4159 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4160 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4161 let err_row = second
4162 .iter()
4163 .find(|outcome| outcome.status == OutcomeStatus::Error)
4164 .expect("error outcome present");
4165 let err = err_row.error.as_ref().expect("error body present");
4166 assert_eq!(err.field, Some("source_agent"));
4167 assert_eq!(err.reason, Some("immutable"));
4168
4169 let stored = store
4171 .get_session(&base_session().id)
4172 .await?
4173 .expect("session row survives the rejected re-ingest");
4174 assert_eq!(stored.session.source_agent, "claude-code");
4175
4176 Ok(())
4177 }
4178
4179 #[tokio::test(flavor = "multi_thread")]
4180 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4181 let temp = TempDir::new()?;
4182 let store = Store::open_local(temp.path()).await?;
4183
4184 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4185 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4186
4187 let mut tampered = base_session();
4188 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4189 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4190 let err_row = second
4191 .iter()
4192 .find(|outcome| outcome.status == OutcomeStatus::Error)
4193 .expect("project change must surface an error outcome");
4194 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4195
4196 let stored = store
4197 .get_session(&base_session().id)
4198 .await?
4199 .expect("session row survives");
4200 assert_eq!(
4201 stored.session.project.as_str(),
4202 "/home/me/proj",
4203 "stored project must remain the original",
4204 );
4205
4206 Ok(())
4207 }
4208
4209 async fn store_with_messages(
4213 temp: &TempDir,
4214 count: usize,
4215 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4216 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4217 }
4218
4219 async fn store_with_messages_at_threshold(
4222 temp: &TempDir,
4223 count: usize,
4224 _vector_threshold: usize,
4225 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4226 let store = Store::open_local(temp.path()).await?;
4227 let sessions = 8.min(count.max(1));
4228 let mut events = Vec::new();
4229 for s in 0..sessions {
4230 events.push(IngestEvent::Session(Session {
4231 id: format!("session-{s}"),
4232 parent_session_id: None,
4233 parent_message_id: None,
4234 source_agent: "claude-code".to_owned(),
4235 created_at: Utc::now(),
4236 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4237 options: ProviderOptions::new(),
4238 }));
4239 for i in (s..count).step_by(sessions) {
4240 let message_id = format!("msg-{i}");
4241 events.push(IngestEvent::Message(Message::User {
4242 id: message_id.clone(),
4243 session_id: format!("session-{s}"),
4244 timestamp: Utc::now(),
4245 options: ProviderOptions::new(),
4246 }));
4247 events.push(IngestEvent::Part(Part {
4248 session_id: format!("session-{s}"),
4249 id: format!("{message_id}-part"),
4250 message_id,
4251 ordinal: 0,
4252 provenance: crate::wire::Provenance::Conversational,
4253 options: ProviderOptions::new(),
4254 kind: PartKind::Text {
4255 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4256 },
4257 }));
4258 }
4259 }
4260 ingest_events(&store, events).await?;
4261 let keys = (0..count)
4262 .map(|i| MessageKey {
4263 session_id: format!("session-{}", i % sessions),
4264 message_id: format!("msg-{i}"),
4265 })
4266 .collect();
4267 Ok((store, keys))
4268 }
4269
4270 fn synthetic_vector(seed: usize) -> Vec<f32> {
4272 let mut state = (seed as u64)
4273 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4274 .wrapping_add(1);
4275 (0..embedding_dim())
4276 .map(|_| {
4277 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4278 #[allow(clippy::cast_precision_loss)]
4279 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4280 unit - 1.0
4281 })
4282 .collect()
4283 }
4284
4285 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4287 keys.iter()
4288 .enumerate()
4289 .map(|(seed, key)| EmbeddedMessage {
4290 session_id: key.session_id.clone(),
4291 id: key.message_id.clone(),
4292 vector: synthetic_vector(seed),
4293 })
4294 .collect()
4295 }
4296
4297 fn embedding_update_batch_with_model(
4298 rows: &[EmbeddedMessage],
4299 model: &str,
4300 ) -> Result<RecordBatch> {
4301 let mut batch = embedding_update_batch(rows)?;
4302 let columns = batch
4303 .columns()
4304 .iter()
4305 .take(3)
4306 .cloned()
4307 .chain(std::iter::once(
4308 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4309 ))
4310 .collect::<Vec<_>>();
4311 batch = RecordBatch::try_new(batch.schema(), columns)?;
4312 Ok(batch)
4313 }
4314
4315 #[tokio::test]
4316 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4317 let temp = TempDir::new()?;
4318 let (store, keys) = store_with_messages(&temp, 4).await?;
4322 store.write_embeddings(&embedded(&keys)).await?;
4323 store.optimize_indices(None, None).await?.into_result()?;
4324
4325 let query = vec![0.01_f32; embedding_dim()];
4326 let plan = store
4327 .explain_vector_plan(
4328 &query,
4329 10,
4330 &Predicate::Eq("session_id", "session-3".into()),
4331 None,
4332 )
4333 .await?;
4334
4335 assert!(
4340 plan.contains("ScalarIndexQuery"),
4341 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4342 );
4343 let predicate_postfiltered = plan
4344 .lines()
4345 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4346 assert!(
4347 !predicate_postfiltered,
4348 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4349 );
4350 Ok(())
4351 }
4352
4353 #[tokio::test]
4354 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4355 let temp = TempDir::new()?;
4356 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4357
4358 store.write_embeddings(&embedded(&keys[..255])).await?;
4361 store
4362 .optimize_indices_with_vector_threshold(256)
4363 .await?
4364 .into_result()?;
4365 assert!(
4366 !store
4367 .handle
4368 .messages_index_names()
4369 .await?
4370 .iter()
4371 .any(|name| name == MESSAGES_VECTOR_INDEX),
4372 "IVF_PQ must not exist below the activation threshold",
4373 );
4374
4375 store.write_embeddings(&embedded(&keys[255..256])).await?;
4378 store
4379 .optimize_indices_with_vector_threshold(256)
4380 .await?
4381 .into_result()?;
4382 assert!(
4383 store
4384 .handle
4385 .messages_index_names()
4386 .await?
4387 .iter()
4388 .any(|name| name == MESSAGES_VECTOR_INDEX),
4389 "optimize must create the IVF_PQ once the threshold is crossed",
4390 );
4391
4392 let hits = store
4395 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4396 .await?;
4397 assert!(
4398 hits.iter().any(|(key, _)| key == &keys[0]),
4399 "an embedded row is retrievable via the index",
4400 );
4401 Ok(())
4402 }
4403
4404 #[tokio::test]
4405 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4406 {
4407 let temp = TempDir::new()?;
4408 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4409 let old_rows = embedded(&keys);
4410 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4411 store
4412 .handle
4413 .merge_update(Table::Messages, old_batch, old_rows.len())
4414 .await?;
4415 store
4416 .optimize_indices_with_vector_threshold(256)
4417 .await?
4418 .into_result()?;
4419 assert!(
4420 store
4421 .handle
4422 .messages_index_names()
4423 .await?
4424 .iter()
4425 .any(|name| name == MESSAGES_VECTOR_INDEX),
4426 "IVF_PQ must exist before a model swap",
4427 );
4428 assert_eq!(store.stale_embedding_count().await?, keys.len());
4429
4430 store.drop_vector_index().await?;
4431 let mut pending = Vec::new();
4432 let stream = store.pending_or_stale_messages();
4433 tokio::pin!(stream);
4434 while let Some(row) = stream.next().await {
4435 pending.push(row?);
4436 }
4437 assert_eq!(
4438 pending.len(),
4439 keys.len(),
4440 "force stream should see stale rows"
4441 );
4442 store.write_embeddings(&embedded(&keys)).await?;
4443 assert_eq!(store.stale_embedding_count().await?, 0);
4444 store
4445 .optimize_indices_with_vector_threshold(256)
4446 .await?
4447 .into_result()?;
4448 assert!(
4449 store
4450 .handle
4451 .messages_index_names()
4452 .await?
4453 .iter()
4454 .any(|name| name == MESSAGES_VECTOR_INDEX),
4455 "optimize must rebuild IVF_PQ after force re-embed",
4456 );
4457
4458 let stream = store.pending_or_stale_messages();
4459 tokio::pin!(stream);
4460 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4461 Ok(())
4462 }
4463
4464 #[tokio::test]
4465 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4466 let temp = TempDir::new()?;
4475 let (store, _keys) = store_with_messages(&temp, 4).await?;
4476
4477 for tag in 0..3 {
4480 let extra = synthetic_session(&format!("extra-{tag}"));
4481 store.upsert_sessions(&[extra]).await?;
4482 }
4483
4484 let dataset = store.handle.dataset(Table::Sessions).await?;
4489 dataset
4490 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4491 .await
4492 .context("cleanup_old_versions failed")?;
4493
4494 let map = store.session_last_ingested_at().await?;
4495 let session_count = store.row_counts().await?.0;
4496 assert!(
4497 map.len() >= session_count,
4498 "watermark map ({}) must still cover every session ({}) after \
4499 version cleanup; an empty fallback regresses pond sync to a \
4500 full re-scan",
4501 map.len(),
4502 session_count,
4503 );
4504 Ok(())
4505 }
4506
4507 #[tokio::test]
4508 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4509 let temp = TempDir::new()?;
4510 let (store, keys) = store_with_messages(&temp, 10).await?;
4511
4512 let before = store.embedding_progress().await?;
4513 assert_eq!(before.embedded, 0);
4514 assert_eq!(before.total, 10);
4515 assert_eq!(before.model, crate::embed::model_id());
4516
4517 store.write_embeddings(&embedded(&keys[..4])).await?;
4518 let partial = store.embedding_progress().await?;
4519 assert_eq!(partial.embedded, 4);
4520 assert_eq!(partial.total, 10);
4521
4522 store.write_embeddings(&embedded(&keys[4..])).await?;
4523 let full = store.embedding_progress().await?;
4524 assert_eq!(full.embedded, 10);
4525 assert_eq!(full.total, 10);
4526 Ok(())
4527 }
4528}