1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{
32 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33 SessionFrom,
34 },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40 handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45 pub sessions: usize,
46 pub messages: usize,
47 pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52 pub sessions: u64,
53 pub messages: u64,
54 pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59 pub rows: LanceArchiveCounts,
60 pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65 pub rows: LanceArchiveCounts,
66 pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71 pub sessions: Vec<IndexIntent>,
72 pub messages: Vec<IndexIntent>,
73 pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78 [
79 (Table::Sessions, &self.sessions),
80 (Table::Messages, &self.messages),
81 (Table::Parts, &self.parts),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91 pub session_id: String,
92 pub id: String,
93 pub search_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100 pub session_id: String,
101 pub id: String,
102 pub vector: Vec<f32>,
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108 pub message_id: String,
109 pub session_id: String,
110 pub role: String,
111 pub project: String,
112 pub source_agent: String,
113 pub timestamp: DateTime<Utc>,
114 pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119 pub session_id: String,
120 pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125 Inserted,
126 Matched,
127}
128
129#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135 pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139 pub fn any_indices_failed(&self) -> bool {
142 self.tables.iter().any(|t| t.indices.is_failed())
143 }
144
145 pub fn into_result(self) -> Result<Self> {
149 for table in &self.tables {
150 if let PhaseOutcome::Failed(error) = &table.indices {
151 anyhow::bail!(
152 "indices phase failed on {}: {error:#}",
153 table.table.as_str()
154 );
155 }
156 if let PhaseOutcome::Failed(error) = &table.compaction {
157 anyhow::bail!(
158 "compaction phase failed on {}: {error:#}",
159 table.table.as_str()
160 );
161 }
162 }
163 Ok(self)
164 }
165}
166
167#[derive(Debug, Clone)]
170pub struct CorpusStats {
171 pub data_url: Url,
172 pub totals: RowTotals,
173 pub adapters: Vec<AdapterStats>,
181 pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189 pub sessions: u64,
190 pub messages: u64,
191 pub parts: u64,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201 pub embedded: usize,
202 pub total: usize,
203 pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208 pub adapter: String,
212 pub sessions: u64,
213 pub messages: u64,
214 pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221 pub project: String,
222 pub sessions: u64,
223 pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228 messages: u64,
229 session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234 pub message: &'a Message,
235 pub parts: &'a [Part],
236 pub search_text: Option<&'a str>,
237}
238
239impl Store {
240 pub async fn open(location: &Url) -> Result<Self> {
246 Ok(Self {
247 handle: Handle::open(location).await?,
248 })
249 }
250
251 pub async fn open_with_options(
257 location: &Url,
258 storage_options: std::collections::HashMap<String, String>,
259 caps: crate::substrate::RuntimeCaps,
260 ) -> Result<Self> {
261 Ok(Self {
262 handle: Handle::open_with_options(location, storage_options, caps).await?,
263 })
264 }
265
266 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271 let url = config::url_for_path(path)?;
272 Self::open_with_options(
273 &url,
274 std::collections::HashMap::new(),
275 crate::substrate::RuntimeCaps::default(),
276 )
277 .await
278 }
279
280 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288 std::fs::create_dir_all(dest)
289 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290 let (sessions, sessions_version) = self
291 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292 .await?;
293 let (messages, messages_version) = self
294 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295 .await?;
296 let (parts, parts_version) = self
297 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298 .await?;
299 Ok(LanceArchiveExport {
300 rows: LanceArchiveCounts {
301 sessions,
302 messages,
303 parts,
304 },
305 source_versions: LanceArchiveVersions {
306 sessions: sessions_version,
307 messages: messages_version,
308 parts: parts_version,
309 },
310 })
311 }
312
313 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314 let sessions_dataset =
315 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316 let messages_dataset =
317 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319 let (sessions, sessions_inserted) = self
320 .import_clean_table(Table::Sessions, sessions_dataset)
321 .await?;
322 let (messages, messages_inserted) = self
323 .import_clean_table(Table::Messages, messages_dataset)
324 .await?;
325 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326 Ok(LanceArchiveImport {
327 rows: LanceArchiveCounts {
328 sessions,
329 messages,
330 parts,
331 },
332 inserted: LanceArchiveCounts {
333 sessions: sessions_inserted,
334 messages: messages_inserted,
335 parts: parts_inserted,
336 },
337 })
338 }
339
340 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341 let dataset = self.handle.dataset(table).await?;
342 let source_version = dataset.version_id();
343 let schema = export_schema(table);
344 let mut stream = dataset
345 .scan()
346 .try_into_stream()
347 .await
348 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349 let dest_uri = dest
350 .to_str()
351 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353 let mut rows = 0usize;
354 let mut wrote = false;
355 while let Some(batch) = stream.next().await {
356 let batch = batch
357 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358 rows += batch.num_rows();
359 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360 let mut params = write_params_for_create();
361 if wrote {
362 params.mode = WriteMode::Append;
363 }
364 Dataset::write(reader, dest_uri, Some(params))
365 .await
366 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367 wrote = true;
368 }
369
370 if !wrote {
371 let batch = RecordBatch::new_empty(schema.clone());
372 let reader = RecordBatchIterator::new([Ok(batch)], schema);
373 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374 .await
375 .with_context(|| {
376 format!("failed to write empty {} archive table", table.as_str())
377 })?;
378 }
379 Ok((rows, source_version))
380 }
381
382 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383 let mut stream = dataset
384 .scan()
385 .try_into_stream()
386 .await
387 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388 let mut rows = 0usize;
389 let mut inserted = 0usize;
390 while let Some(batch) = stream.next().await {
391 let batch = batch
392 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393 let row_count = batch.num_rows();
394 rows += row_count;
395 inserted += self
396 .handle
397 .merge_insert(table, batch, row_count)
398 .await
399 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400 as usize;
401 }
402 Ok((rows, inserted))
403 }
404
405 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406 if sessions.is_empty() {
407 return Ok(Vec::new());
408 }
409 let batches = sessions_batches(sessions)?;
410 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411 Ok(statuses_from_inserted(sessions.len(), inserted))
413 }
414
415 async fn upsert_session_batch(
439 &self,
440 substreams: Vec<CompletedSubstream>,
441 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442 if substreams.is_empty() {
443 return Ok((Vec::new(), BatchCounts::default()));
444 }
445
446 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447 let mut counts = BatchCounts::default();
448
449 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453 let mut by_session_id: std::collections::HashMap<String, usize> =
454 std::collections::HashMap::with_capacity(substreams.len());
455 for substream in substreams {
456 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457 let existing = &merged[existing_idx];
458 if existing.session.source_agent != substream.session.source_agent
459 || existing.session.project != substream.session.project
460 {
461 let reason = if existing.session.source_agent != substream.session.source_agent
466 {
467 IngestError::ImmutableField {
468 field: "source_agent",
469 session_id: substream.session.id.clone(),
470 stored: existing.session.source_agent.clone(),
471 attempted: substream.session.source_agent.clone(),
472 }
473 } else {
474 IngestError::ImmutableField {
475 field: "project",
476 session_id: substream.session.id.clone(),
477 stored: (*existing.session.project).clone(),
478 attempted: (*substream.session.project).clone(),
479 }
480 };
481 let field = match &reason {
482 IngestError::ImmutableField { field, .. } => Some(*field),
483 };
484 let reason_key = match field {
485 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487 _ => DROP_REASON_UNCATEGORIZED,
488 };
489 outcomes.extend(error_outcomes_for_substream(
490 substream.session_index,
491 &substream.session,
492 &substream.messages,
493 reason.to_string(),
494 field,
495 reason_key,
496 ));
497 continue;
498 }
499 let existing = &mut merged[existing_idx];
504 let mut seen: std::collections::HashSet<String> = existing
505 .messages
506 .iter()
507 .map(|m| m.message.id().to_owned())
508 .collect();
509 for msg in substream.messages {
510 if seen.insert(msg.message.id().to_owned()) {
511 existing.messages.push(msg);
512 }
513 }
514 continue;
515 }
516 by_session_id.insert(substream.session.id.clone(), merged.len());
517 merged.push(substream);
518 }
519
520 let session_id_values: Vec<ScalarValue> = merged
525 .iter()
526 .map(|substream| ScalarValue::String(substream.session.id.clone()))
527 .collect();
528 let existing_sessions: std::collections::HashMap<String, Session> =
529 if session_id_values.is_empty() {
530 std::collections::HashMap::new()
531 } else {
532 let batch = self
533 .handle
534 .scan_batch(
535 Table::Sessions,
536 Some(&Predicate::In("id", session_id_values.clone())),
537 &[],
538 )
539 .await?;
540 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541 for row in 0..batch.num_rows() {
542 let session = session_from_batch(&batch, row)?;
543 map.insert(session.id.clone(), session);
544 }
545 map
546 };
547 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548 HashSet::new()
549 } else {
550 let batch = self
551 .handle
552 .scan_batch(
553 Table::Messages,
554 Some(&Predicate::In("session_id", session_id_values.clone())),
555 &["session_id", "id"],
556 )
557 .await?;
558 let mut set = HashSet::with_capacity(batch.num_rows());
559 for row in 0..batch.num_rows() {
560 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561 let mid = string(&batch, "id", row)?.context("message id is null")?;
562 set.insert((sid, mid));
563 }
564 set
565 };
566 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567 HashSet::new()
568 } else {
569 let batch = self
570 .handle
571 .scan_batch(
572 Table::Parts,
573 Some(&Predicate::In("session_id", session_id_values)),
574 &["session_id", "message_id", "id"],
575 )
576 .await?;
577 let mut set = HashSet::with_capacity(batch.num_rows());
578 for row in 0..batch.num_rows() {
579 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581 let pid = string(&batch, "id", row)?.context("part id is null")?;
582 set.insert((sid, mid, pid));
583 }
584 set
585 };
586
587 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588 for substream in merged {
589 if let Some(existing) = existing_sessions.get(&substream.session.id)
590 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591 {
592 let field = match &failure {
593 IngestError::ImmutableField { field, .. } => Some(*field),
594 };
595 let reason_key = match field {
596 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598 _ => DROP_REASON_UNCATEGORIZED,
599 };
600 outcomes.extend(error_outcomes_for_substream(
601 substream.session_index,
602 &substream.session,
603 &substream.messages,
604 failure.to_string(),
605 field,
606 reason_key,
607 ));
608 continue;
609 }
610 writeable.push(substream);
611 }
612
613 if writeable.is_empty() {
614 outcomes.sort_by_key(|outcome| outcome.index);
615 return Ok((outcomes, counts));
616 }
617
618 let sessions_owned: Vec<Session> = writeable
619 .iter()
620 .map(|substream| substream.session.clone())
621 .collect();
622 let message_rows: Vec<MessageBatchRow<'_>> = writeable
623 .iter()
624 .flat_map(|substream| {
625 substream.messages.iter().map(|buffered| MessageBatchRow {
626 message: &buffered.message,
627 source_agent: &substream.session.source_agent,
628 project: &substream.session.project,
629 search_text: buffered.search_text.as_deref(),
630 })
631 })
632 .collect();
633 let part_rows: Vec<Part> = writeable
634 .iter()
635 .flat_map(|substream| {
636 substream.messages.iter().flat_map(|buffered| {
637 buffered
638 .parts
639 .iter()
640 .map(|buffered_part| buffered_part.part.clone())
641 })
642 })
643 .collect();
644
645 let session_batches = sessions_batches(&sessions_owned)?;
646 let message_batches = messages_batches(&message_rows)?;
647 let part_batches = parts_batches(&part_rows)?;
648
649 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660 )?;
661
662 for substream in &writeable {
663 outcomes.extend(success_outcomes_for_substream(
664 substream.session_index,
665 &substream.session,
666 &substream.messages,
667 &existing_sessions,
668 &existing_message_pks,
669 &existing_part_pks,
670 &mut counts,
671 ));
672 }
673
674 outcomes.sort_by_key(|outcome| outcome.index);
675 Ok((outcomes, counts))
676 }
677
678 pub async fn upsert_messages(
679 &self,
680 session: &Session,
681 messages: &[MessageWrite<'_>],
682 ) -> Result<Vec<UpsertStatus>> {
683 if messages.is_empty() {
684 return Ok(Vec::new());
685 }
686
687 let rows = messages
688 .iter()
689 .map(|write| MessageBatchRow {
690 message: write.message,
691 source_agent: &session.source_agent,
692 project: &session.project,
693 search_text: write.search_text,
694 })
695 .collect::<Vec<_>>();
696 let batches = messages_batches(&rows)?;
697 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698 Ok(statuses_from_inserted(messages.len(), inserted))
699 }
700
701 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702 if parts.is_empty() {
703 return Ok(Vec::new());
704 }
705 let batches = parts_batches(parts)?;
706 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707 Ok(statuses_from_inserted(parts.len(), inserted))
708 }
709
710 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711 let Some(session) = self.find_session(session_id).await? else {
712 return Ok(None);
713 };
714 let messages = self.messages_for_session(session_id).await?;
715 Ok(Some(SessionWithMessages { session, messages }))
716 }
717
718 pub async fn session_ids(&self) -> Result<Vec<String>> {
720 let batch = self
721 .handle
722 .scan_batch(Table::Sessions, None, &["id"])
723 .await?;
724 let mut ids = Vec::with_capacity(batch.num_rows());
725 for row in 0..batch.num_rows() {
726 if let Some(id) = string(&batch, "id", row)? {
727 ids.push(id);
728 }
729 }
730 Ok(ids)
731 }
732
733 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734 let batch = self
735 .handle
736 .scan_batch(
737 Table::Sessions,
738 Some(&Predicate::Eq(
739 "parent_session_id",
740 parent_session_id.into(),
741 )),
742 &[
743 "id",
744 "parent_session_id",
745 "parent_message_id",
746 "source_agent",
747 "created_at",
748 "project",
749 "options",
750 ],
751 )
752 .await?;
753 let mut sessions = Vec::with_capacity(batch.num_rows());
754 for row in 0..batch.num_rows() {
755 sessions.push(session_from_batch(&batch, row)?);
756 }
757 sessions.sort_by(|left, right| left.id.cmp(&right.id));
758 Ok(sessions)
759 }
760
761 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767 use lance::deps::arrow_array::UInt64Array;
768
769 let dataset = self.handle.dataset(Table::Sessions).await?;
770 let version_list = dataset.versions().await?;
771 let versions: HashMap<u64, DateTime<Utc>> = version_list
772 .iter()
773 .map(|v| (v.version, v.timestamp))
774 .collect();
775 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784 let scanner = self
785 .handle
786 .scan(
787 Table::Sessions,
788 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789 )
790 .await?;
791 let mut stream = scanner.try_into_stream().await?;
792 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793 while let Some(batch) = stream.next().await {
794 let batch = batch?;
795 let version_array = batch
796 .column_by_name("_row_last_updated_at_version")
797 .context("missing _row_last_updated_at_version column")?
798 .as_any()
799 .downcast_ref::<UInt64Array>()
800 .context("_row_last_updated_at_version is not UInt64")?;
801 for row in 0..batch.num_rows() {
802 let Some(id) = string(&batch, "id", row)? else {
803 continue;
804 };
805 if version_array.is_null(row) {
806 continue;
807 }
808 let version = version_array.value(row);
809 let ts = versions.get(&version).copied().or(oldest_visible_ts);
810 if let Some(ts) = ts {
811 out.insert(id, ts);
812 }
813 }
814 }
815 Ok(out)
816 }
817
818 pub async fn session_view(
825 &self,
826 session_id: &str,
827 params: SessionViewParams<'_>,
828 ) -> Result<GetLookup<SessionPage>> {
829 let Some(session) = self.find_session(session_id).await? else {
830 return Ok(GetLookup::NotFound);
831 };
832
833 let mut rows = match params.mode {
834 ResponseMode::Conversational => self
835 .scan_conversational_messages(session_id)
836 .await?
837 .into_iter()
838 .map(|row| ScanRow {
839 id: row.message_id,
840 role: row.role,
841 timestamp: row.timestamp,
842 text: Some(row.text.into_inner()),
843 content: None,
844 })
845 .collect(),
846 ResponseMode::Complete | ResponseMode::Verbatim => {
847 self.scan_all_messages(session_id).await?
848 }
849 };
850 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852 let start_at = match params.after_id {
853 Some(after) => match rows.iter().position(|row| row.id == after) {
856 Some(idx) => idx + 1,
857 None => return Ok(GetLookup::UnknownAfterId),
858 },
859 None => 0,
860 };
861 let remaining = rows.get(start_at..).unwrap_or(&[]);
862 let (emitted, messages_remaining) = match params.session_from {
863 SessionFrom::Start => {
864 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865 row.text.as_deref().map_or(0, str::len)
866 });
867 (&remaining[..n], remaining.len() - n)
868 }
869 SessionFrom::End => {
873 let mut bytes = 0usize;
874 let mut start = remaining.len();
875 for row in remaining.iter().rev() {
876 if remaining.len() - start >= params.limit {
877 break;
878 }
879 let size = row.text.as_deref().map_or(0, str::len);
880 if start < remaining.len() && bytes + size > params.budget_bytes {
881 break;
882 }
883 bytes += size;
884 start -= 1;
885 }
886 (&remaining[start..], start)
887 }
888 };
889 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891 let mut parts_by_message = match params.mode {
894 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895 ResponseMode::Conversational | ResponseMode::Complete => {
896 self.summary_parts_for_messages(session_id, &ids).await?
897 }
898 };
899 let messages = emitted
900 .iter()
901 .map(|row| RetrievedMessage {
902 id: row.id.clone(),
903 role: row.role,
904 timestamp: row.timestamp,
905 text: row.text.clone(),
906 content: row.content.clone(),
907 parts: parts_by_message
908 .remove(&(session_id.to_owned(), row.id.clone()))
909 .unwrap_or_default(),
910 })
911 .collect();
912
913 Ok(GetLookup::Found(SessionPage {
914 session,
915 messages,
916 messages_remaining,
917 }))
918 }
919
920 pub async fn message_view(
926 &self,
927 message_id: &str,
928 params: MessageViewParams<'_>,
929 ) -> Result<GetLookup<MessagePage>> {
930 let Some(session_id) = self.session_id_for_message(message_id).await? else {
931 return Ok(GetLookup::NotFound);
932 };
933 let Some(session) = self.find_session(&session_id).await? else {
934 return Ok(GetLookup::NotFound);
935 };
936 let mut rows = self.scan_all_messages(&session_id).await?;
937 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
938 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
939 return Ok(GetLookup::NotFound);
940 };
941
942 let start = target_pos.saturating_sub(params.context_depth);
943 let end = (target_pos + params.context_depth + 1).min(rows.len());
944 let window = &rows[start..end];
945 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
946 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
949
950 let all_parts = parts_by_message
951 .remove(&(session_id.clone(), message_id.to_owned()))
952 .unwrap_or_default();
953 let start_part = match params.after_id {
954 Some(after) => match all_parts.iter().find(|part| part.id == after) {
958 Some(anchor) => all_parts
959 .iter()
960 .position(|part| part.ordinal > anchor.ordinal)
961 .unwrap_or(all_parts.len()),
962 None => return Ok(GetLookup::UnknownAfterId),
963 },
964 None => 0,
965 };
966 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
967 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
968 serde_json::to_string(part).map_or(0, |json| json.len())
969 });
970 let target_parts = remaining_parts[..part_count].to_vec();
971 let target_parts_remaining = remaining_parts.len() - part_count;
972
973 let target_row = &rows[target_pos];
974 let target = RetrievedMessage {
975 id: target_row.id.clone(),
976 role: target_row.role,
977 timestamp: target_row.timestamp,
978 text: target_row.text.clone(),
979 content: target_row.content.clone(),
980 parts: Vec::new(),
982 };
983 let siblings = window
984 .iter()
985 .enumerate()
986 .filter(|(idx, _)| start + idx != target_pos)
987 .map(|(_, row)| RetrievedMessage {
988 id: row.id.clone(),
989 role: row.role,
990 timestamp: row.timestamp,
991 text: row.text.clone(),
992 content: row.content.clone(),
993 parts: parts_by_message
994 .get(&(session_id.clone(), row.id.clone()))
995 .cloned()
996 .unwrap_or_default(),
997 })
998 .collect();
999
1000 Ok(GetLookup::Found(MessagePage {
1001 session,
1002 target,
1003 target_parts,
1004 target_parts_remaining,
1005 siblings,
1006 }))
1007 }
1008
1009 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1010 let batch = self
1011 .handle
1012 .scan_batch(
1013 Table::Messages,
1014 Some(&Predicate::Eq("session_id", session_id.into())),
1015 &["id", "timestamp", "role", "search_text", "content"],
1016 )
1017 .await?;
1018 let mut rows = Vec::with_capacity(batch.num_rows());
1019 for row in 0..batch.num_rows() {
1020 let id = string(&batch, "id", row)?.context("message id is null")?;
1021 let role =
1022 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1023 let timestamp = datetime(&batch, "timestamp", row)?;
1024 rows.push(ScanRow {
1025 id,
1026 role,
1027 timestamp,
1028 text: string(&batch, "search_text", row)?,
1029 content: string(&batch, "content", row)?,
1030 });
1031 }
1032 Ok(rows)
1033 }
1034
1035 pub async fn scan_conversational_messages(
1039 &self,
1040 session_id: &str,
1041 ) -> Result<Vec<ConversationalRow>> {
1042 let filter = Predicate::And(vec![
1043 Predicate::Eq("session_id", session_id.into()),
1044 Predicate::IsNotNull("search_text"),
1045 ]);
1046 let batch = self
1047 .handle
1048 .scan_batch(
1049 Table::Messages,
1050 Some(&filter),
1051 &["id", "timestamp", "role", "search_text"],
1052 )
1053 .await?;
1054
1055 let mut rows = Vec::with_capacity(batch.num_rows());
1056 for row in 0..batch.num_rows() {
1057 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1058 let role =
1059 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1060 let timestamp = datetime(&batch, "timestamp", row)?;
1061 let text_str = string(&batch, "search_text", row)?.context(
1062 "search_text null after IsNotNull pushdown - storage invariant violated",
1063 )?;
1064 rows.push(ConversationalRow {
1065 session_id: session_id.to_owned(),
1066 message_id,
1067 role,
1068 timestamp,
1069 text: SearchText(text_str),
1070 });
1071 }
1072 rows.sort_by(|a, b| {
1073 a.timestamp
1074 .cmp(&b.timestamp)
1075 .then_with(|| a.message_id.cmp(&b.message_id))
1076 });
1077 Ok(rows)
1078 }
1079
1080 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1083 let batch = self
1084 .handle
1085 .scan_batch(
1086 Table::Messages,
1087 Some(&Predicate::Eq("id", message_id.into())),
1088 &["session_id"],
1089 )
1090 .await?;
1091 if batch.num_rows() == 0 {
1092 return Ok(None);
1093 }
1094 string(&batch, "session_id", 0)
1095 }
1096
1097 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1098 self.handle.row_counts().await
1099 }
1100
1101 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1107 let scanner = self
1108 .handle
1109 .scan(
1110 Table::Messages,
1111 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1112 )
1113 .await?;
1114 let mut stream = scanner.try_into_stream().await?;
1115 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1116 while let Some(batch) = stream.next().await {
1117 let batch = batch?;
1118 for row in 0..batch.num_rows() {
1119 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1120 let project = string(&batch, "project", row)?.unwrap_or_default();
1121 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1122 let is_subagent = source_agent.contains('/');
1123 if is_subagent && !include_subagents {
1124 continue;
1125 }
1126 let entry = groups.entry((source_agent, project)).or_default();
1127 entry.messages += 1;
1128 entry.session_ids.insert(session_id);
1129 }
1130 }
1131
1132 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1133 let totals = RowTotals {
1134 sessions: totals_sessions as u64,
1135 messages: totals_messages as u64,
1136 parts: totals_parts as u64,
1137 };
1138
1139 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1140 for ((adapter, project), acc) in groups {
1141 by_adapter.entry(adapter).or_default().push(ProjectStats {
1142 project,
1143 sessions: acc.session_ids.len() as u64,
1144 messages: acc.messages,
1145 });
1146 }
1147
1148 let mut adapters = Vec::with_capacity(by_adapter.len());
1149 for (adapter, mut projects) in by_adapter {
1150 projects.sort_by(|a, b| {
1151 b.messages
1152 .cmp(&a.messages)
1153 .then_with(|| a.project.cmp(&b.project))
1154 });
1155 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1156 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1157 adapters.push(AdapterStats {
1158 adapter,
1159 sessions,
1160 messages,
1161 projects,
1162 });
1163 }
1164
1165 Ok(CorpusStats {
1166 data_url: self.handle.location().clone(),
1167 totals,
1168 adapters,
1169 include_subagents,
1170 })
1171 }
1172
1173 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1178 if rows.is_empty() {
1179 return Ok(());
1180 }
1181 let batch = embedding_update_batch(rows)?;
1182 self.handle
1183 .merge_update(Table::Messages, batch, rows.len())
1184 .await?;
1185 Ok(())
1186 }
1187
1188 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1191 try_stream! {
1192 let filter = Predicate::And(vec![
1193 Predicate::IsNull("vector"),
1194 Predicate::IsNotNull("search_text"),
1195 ]);
1196 let projection: &[&str] = &["session_id", "id", "search_text"];
1197 let scanner = self
1198 .handle
1199 .scan(
1200 Table::Messages,
1201 ScanOpts::with_predicate_and_projection(&filter, projection),
1202 )
1203 .await?;
1204 let mut batches = scanner
1205 .try_into_stream()
1206 .await
1207 .context("failed to open messages stream")?;
1208 while let Some(batch) = batches.next().await {
1209 let batch = batch?;
1210 for row in 0..batch.num_rows() {
1211 yield PendingMessage {
1212 session_id: string(&batch, "session_id", row)?
1213 .context("session_id is null")?,
1214 id: string(&batch, "id", row)?.context("message id is null")?,
1215 search_text: string(&batch, "search_text", row)?
1216 .context("search_text is null")?,
1217 };
1218 }
1219 }
1220 }
1221 }
1222
1223 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1228 try_stream! {
1229 let filter = Predicate::And(vec![
1230 Predicate::IsNotNull("search_text"),
1231 Predicate::Or(vec![
1232 Predicate::IsNull("vector"),
1233 Predicate::Ne("embedding_model", embed::model_id().into()),
1234 ]),
1235 ]);
1236 let projection: &[&str] = &["session_id", "id", "search_text"];
1237 let scanner = self
1238 .handle
1239 .scan(
1240 Table::Messages,
1241 ScanOpts::with_predicate_and_projection(&filter, projection),
1242 )
1243 .await?;
1244 let mut batches = scanner
1245 .try_into_stream()
1246 .await
1247 .context("failed to open pending-or-stale messages stream")?;
1248 while let Some(batch) = batches.next().await {
1249 let batch = batch?;
1250 for row in 0..batch.num_rows() {
1251 yield PendingMessage {
1252 session_id: string(&batch, "session_id", row)?
1253 .context("session_id is null")?,
1254 id: string(&batch, "id", row)?.context("message id is null")?,
1255 search_text: string(&batch, "search_text", row)?
1256 .context("search_text is null")?,
1257 };
1258 }
1259 }
1260 }
1261 }
1262
1263 pub async fn fts_search(
1265 &self,
1266 query: &str,
1267 limit: usize,
1268 filter: &Predicate,
1269 ) -> Result<Vec<(MessageKey, f32)>> {
1270 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1271 scanner.full_text_search(
1272 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1273 )?;
1274 scanner.disable_scoring_autoprojection();
1280 scanner.project(&["session_id", "id", "_score"])?;
1281 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1282 let batch = scanner.try_into_batch().await?;
1283 let mut hits = Vec::with_capacity(batch.num_rows());
1284 for row in 0..batch.num_rows() {
1285 let key = MessageKey {
1286 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1287 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1288 };
1289 hits.push((key, float32(&batch, "_score", row)?));
1290 }
1291 hits.sort_by(|left, right| {
1299 right
1300 .1
1301 .partial_cmp(&left.1)
1302 .unwrap_or(std::cmp::Ordering::Equal)
1303 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1304 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1305 });
1306 Ok(hits)
1307 }
1308
1309 pub async fn has_embeddings(&self) -> Result<bool> {
1314 let scope = Predicate::IsNotNull("vector");
1315 let mut scanner = self
1316 .handle
1317 .scan(
1318 Table::Messages,
1319 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1320 )
1321 .await?;
1322 scanner.limit(Some(1), None)?;
1323 let batch = scanner.try_into_batch().await?;
1324 Ok(batch.num_rows() > 0)
1325 }
1326
1327 pub async fn vector_search(
1335 &self,
1336 query: &[f32],
1337 limit: usize,
1338 filter: &Predicate,
1339 search: Option<&config::SearchConfig>,
1340 ) -> Result<Vec<(MessageKey, f32)>> {
1341 let scope = embedded_scope(filter);
1342 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1343 let key = Float32Array::from(query.to_vec());
1344 scanner.nearest("vector", &key, limit)?;
1345 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1346 scanner.nprobes(nprobes);
1347 }
1348 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1349 scanner.refine(refine_factor);
1350 }
1351 scanner.disable_scoring_autoprojection();
1355 scanner.project(&["session_id", "id", "_distance"])?;
1356 let batch = scanner.try_into_batch().await?;
1357 let mut hits = Vec::with_capacity(batch.num_rows());
1358 for row in 0..batch.num_rows() {
1359 let key = MessageKey {
1360 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1361 message_id: string(&batch, "id", row)?.context("message id is null")?,
1362 };
1363 hits.push((key, float32(&batch, "_distance", row)?));
1364 }
1365 hits.sort_by(|left, right| {
1371 left.1
1372 .partial_cmp(&right.1)
1373 .unwrap_or(std::cmp::Ordering::Equal)
1374 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1375 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1376 });
1377 Ok(hits)
1378 }
1379
1380 pub async fn explain_vector_plan(
1383 &self,
1384 query: &[f32],
1385 limit: usize,
1386 filter: &Predicate,
1387 search: Option<&config::SearchConfig>,
1388 ) -> Result<String> {
1389 let scope = embedded_scope(filter);
1390 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1391 let key = Float32Array::from(query.to_vec());
1392 scanner.nearest("vector", &key, limit)?;
1393 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1394 scanner.nprobes(nprobes);
1395 }
1396 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1397 scanner.refine(refine_factor);
1398 }
1399 scanner
1400 .explain_plan(true)
1401 .await
1402 .context("explain_plan failed")
1403 }
1404
1405 pub async fn explain_fts_plan(
1406 &self,
1407 query: &str,
1408 limit: usize,
1409 filter: &Predicate,
1410 ) -> Result<String> {
1411 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1412 scanner.full_text_search(
1413 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1414 )?;
1415 scanner.project(&["session_id", "id"])?;
1416 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1417 scanner
1418 .explain_plan(true)
1419 .await
1420 .context("explain_plan failed")
1421 }
1422
1423 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1425 if keys.is_empty() {
1426 return Ok(Vec::new());
1427 }
1428 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1429 let session_ids = keys
1430 .iter()
1431 .map(|key| key.session_id.clone())
1432 .collect::<Vec<_>>();
1433 let message_ids = keys
1434 .iter()
1435 .map(|key| key.message_id.clone())
1436 .collect::<Vec<_>>();
1437 let predicate = Predicate::And(vec![
1438 in_predicate("session_id", &session_ids),
1439 in_predicate("id", &message_ids),
1440 ]);
1441 let batch = self
1442 .handle
1443 .scan_batch(
1444 Table::Messages,
1445 Some(&predicate),
1446 &[
1447 "id",
1448 "session_id",
1449 "role",
1450 "project",
1451 "source_agent",
1452 "timestamp",
1453 "search_text",
1454 ],
1455 )
1456 .await?;
1457 let mut metas = Vec::with_capacity(batch.num_rows());
1458 for row in 0..batch.num_rows() {
1459 let message_id = string(&batch, "id", row)?.context("id is null")?;
1460 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1461 if !wanted.contains(&MessageKey {
1462 session_id: session_id.clone(),
1463 message_id: message_id.clone(),
1464 }) {
1465 continue;
1466 }
1467 metas.push(MessageMeta {
1468 message_id,
1469 session_id,
1470 role: string(&batch, "role", row)?.context("role is null")?,
1471 project: string(&batch, "project", row)?.context("project is null")?,
1472 source_agent: string(&batch, "source_agent", row)?
1473 .context("source_agent is null")?,
1474 timestamp: datetime(&batch, "timestamp", row)?,
1475 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1476 });
1477 }
1478 Ok(metas)
1479 }
1480
1481 pub async fn session_message_counts(
1483 &self,
1484 session_ids: &[String],
1485 ) -> Result<BTreeMap<String, usize>> {
1486 if session_ids.is_empty() {
1487 return Ok(BTreeMap::new());
1488 }
1489 let dataset = self.handle.dataset(Table::Messages).await?;
1490 let mut tasks = tokio::task::JoinSet::new();
1491 for session_id in session_ids {
1492 let dataset = dataset.clone();
1493 let session_id = session_id.clone();
1494 tasks.spawn(async move {
1495 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1496 let count = dataset.count_rows(Some(filter)).await?;
1497 anyhow::Ok((session_id, count))
1498 });
1499 }
1500 let mut counts = BTreeMap::new();
1501 while let Some(joined) = tasks.join_next().await {
1502 let (session_id, count) = joined.context("session count task panicked")??;
1503 counts.insert(session_id, count);
1504 }
1505 Ok(counts)
1506 }
1507
1508 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1511 self.handle
1512 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1513 .await
1514 }
1515
1516 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1522 self.handle
1523 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1524 .await
1525 }
1526
1527 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1534 let dataset = self.handle.dataset(Table::Messages).await?;
1535 let embedded = dataset
1536 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1537 .await?;
1538 let total = dataset
1539 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1540 .await?;
1541 Ok(EmbeddingProgress {
1542 embedded,
1543 total,
1544 model: embed::model_id(),
1545 })
1546 }
1547
1548 pub async fn stale_embedding_count(&self) -> Result<usize> {
1552 let dataset = self.handle.dataset(Table::Messages).await?;
1553 dataset
1554 .count_rows(Some(
1555 Predicate::And(vec![
1556 Predicate::IsNotNull("vector"),
1557 Predicate::Ne("embedding_model", embed::model_id().into()),
1558 ])
1559 .to_lance(),
1560 ))
1561 .await
1562 .map_err(Into::into)
1563 }
1564
1565 pub async fn optimize_indices(
1571 &self,
1572 progress: Option<OptimizeProgressFn>,
1573 maintenance: &MaintenancePolicy,
1574 ) -> Result<OptimizeOutcome> {
1575 let intents = pond_index_intents();
1576 let mut tables = Vec::with_capacity(3);
1577 for (table, intents) in intents.all() {
1578 let outcome = self
1579 .handle
1580 .optimize_table(table, intents, progress.as_ref(), maintenance)
1581 .await;
1582 tables.push(outcome);
1583 }
1584 Ok(OptimizeOutcome { tables })
1585 }
1586
1587 pub async fn build_indices_only(
1593 &self,
1594 progress: Option<OptimizeProgressFn>,
1595 ) -> Result<OptimizeOutcome> {
1596 let policy = pond_index_intents();
1597 let mut tables = Vec::with_capacity(3);
1598 for (table, intents) in policy.all() {
1599 let indices = self
1600 .handle
1601 .optimize_table_indices_only(table, intents, progress.as_ref())
1602 .await;
1603 tables.push(TableOptimizeOutcome {
1604 table,
1605 indices,
1606 compaction: PhaseOutcome::NotAttempted,
1607 });
1608 }
1609 Ok(OptimizeOutcome { tables })
1610 }
1611
1612 #[cfg(test)]
1613 async fn optimize_indices_with_vector_threshold(
1614 &self,
1615 vector_threshold: usize,
1616 ) -> Result<OptimizeOutcome> {
1617 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1618 let policy = MaintenancePolicy::always_compact();
1619 let mut tables = Vec::with_capacity(3);
1620 for (table, intents) in intents.all() {
1621 let outcome = self
1622 .handle
1623 .optimize_table(table, intents, None, &policy)
1624 .await;
1625 tables.push(outcome);
1626 }
1627 Ok(OptimizeOutcome { tables })
1628 }
1629
1630 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1631 let policy = pond_index_intents();
1632 let mut matched = false;
1633 for (table, intents) in policy.all() {
1634 for intent in intents {
1635 if intent_name.is_none_or(|name| name == intent.name) {
1636 matched = true;
1637 self.handle.rebuild_index(table, intent).await?;
1638 }
1639 }
1640 }
1641 if let Some(name) = intent_name
1642 && !matched
1643 {
1644 anyhow::bail!("unknown index intent {name:?}");
1645 }
1646 Ok(())
1647 }
1648
1649 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1650 let policy = pond_index_intents();
1651 let mut statuses = Vec::new();
1652 for (table, intents) in policy.all() {
1653 statuses.extend(self.handle.index_status(table, intents).await?);
1654 }
1655 Ok(statuses)
1656 }
1657
1658 pub async fn drop_vector_index(&self) -> Result<()> {
1662 match self
1663 .handle
1664 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1665 .await
1666 {
1667 Ok(()) => Ok(()),
1668 Err(error) => {
1669 let msg = error.to_string();
1670 if msg.contains("not found") || msg.contains("does not exist") {
1671 Ok(())
1672 } else {
1673 Err(error)
1674 }
1675 }
1676 }
1677 }
1678
1679 pub async fn table_sizes(&self) -> Result<TableSizes> {
1682 self.handle.table_sizes().await
1683 }
1684
1685 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1686 let batch = self
1687 .handle
1688 .scan_batch(
1689 Table::Sessions,
1690 Some(&Predicate::Eq("id", session_id.into())),
1691 &[],
1692 )
1693 .await?;
1694 if batch.num_rows() == 0 {
1695 Ok(None)
1696 } else {
1697 Ok(Some(session_from_batch(&batch, 0)?))
1698 }
1699 }
1700
1701 pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1707 let batch = self
1708 .handle
1709 .scan_batch(
1710 Table::Messages,
1711 Some(&Predicate::Eq("id", message_id.into())),
1712 &["vector"],
1713 )
1714 .await?;
1715 if batch.num_rows() == 0 {
1716 return Ok(None);
1717 }
1718 let column = batch
1719 .column(0)
1720 .as_any()
1721 .downcast_ref::<FixedSizeListArray>();
1722 let Some(list) = column else {
1723 return Ok(None);
1724 };
1725 if list.is_null(0) {
1726 return Ok(None);
1727 }
1728 let values = list.value(0);
1729 let halves = values
1730 .as_any()
1731 .downcast_ref::<Float16Array>()
1732 .context("messages.vector inner array is not Float16")?;
1733 let widened = (0..halves.len())
1734 .map(|i| halves.value(i).to_f32())
1735 .collect();
1736 Ok(Some(widened))
1737 }
1738
1739 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1740 let batch = self
1741 .handle
1742 .scan_batch(
1743 Table::Messages,
1744 Some(&Predicate::Eq("session_id", session_id.into())),
1745 &[
1746 "session_id",
1747 "id",
1748 "timestamp",
1749 "role",
1750 "content",
1751 "options",
1752 ],
1753 )
1754 .await?;
1755 let mut messages = Vec::with_capacity(batch.num_rows());
1756 for row in 0..batch.num_rows() {
1757 messages.push(message_from_batch(&batch, row)?);
1758 }
1759 messages.sort_by(|left, right| {
1760 left.timestamp()
1761 .cmp(&right.timestamp())
1762 .then_with(|| left.id().cmp(right.id()))
1763 });
1764
1765 let message_ids = messages
1766 .iter()
1767 .map(|message| message.id().to_owned())
1768 .collect::<Vec<_>>();
1769 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1770
1771 Ok(messages
1772 .into_iter()
1773 .map(|message| {
1774 let key = (message.session_id().to_owned(), message.id().to_owned());
1775 let parts = parts_by_message.remove(&key).unwrap_or_default();
1776 MessageWithParts { message, parts }
1777 })
1778 .collect())
1779 }
1780
1781 pub async fn parts_for_messages(
1785 &self,
1786 session_id: &str,
1787 message_ids: &[String],
1788 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1789 self.scan_parts(session_id, message_ids, None).await
1790 }
1791
1792 pub async fn summary_parts_for_messages(
1797 &self,
1798 session_id: &str,
1799 message_ids: &[String],
1800 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1801 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1802 .await
1803 }
1804
1805 async fn scan_parts(
1806 &self,
1807 session_id: &str,
1808 message_ids: &[String],
1809 part_types: Option<&[&str]>,
1810 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1811 if message_ids.is_empty() {
1812 return Ok(BTreeMap::new());
1813 }
1814 let mut clauses = vec![
1815 Predicate::Eq("session_id", session_id.into()),
1816 in_predicate("message_id", message_ids),
1817 ];
1818 if let Some(types) = part_types {
1819 clauses.push(Predicate::In(
1820 "type",
1821 types.iter().map(|&t| t.into()).collect(),
1822 ));
1823 }
1824 let predicate = Predicate::And(clauses);
1825 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1826 let mut scanner = self
1827 .handle
1828 .scan(
1829 Table::Parts,
1830 ScanOpts::with_predicate_and_projection(
1831 &predicate,
1832 &[
1833 "session_id",
1834 "message_id",
1835 "id",
1836 "ordinal",
1837 "type",
1838 "provenance",
1839 "variant_data",
1840 "options",
1841 ],
1842 ),
1843 )
1844 .await?;
1845 scanner.with_row_address();
1846 let batch = scanner.try_into_batch().await.context("scan failed")?;
1847 let row_addresses = uint64(&batch, "_rowaddr")?;
1848 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1849 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1850 for row in 0..batch.num_rows() {
1851 if string(&batch, "type", row)?.as_deref() == Some("file") {
1852 let variant_data =
1853 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1854 file_rows.push((row, row_addresses.value(row), variant_data));
1855 }
1856 }
1857 if !file_rows.is_empty() {
1858 let addresses = file_rows
1859 .iter()
1860 .map(|(_, address, _)| *address)
1861 .collect::<Vec<_>>();
1862 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1863 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1864 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1868 file_payloads.insert(row, payload);
1869 }
1870 }
1871 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1872 for row in 0..batch.num_rows() {
1873 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1874 parts_by_message
1875 .entry((part.session_id.clone(), part.message_id.clone()))
1876 .or_default()
1877 .push(part);
1878 }
1879 for parts in parts_by_message.values_mut() {
1880 parts.sort_by_key(|part| part.ordinal);
1881 }
1882 Ok(parts_by_message)
1883 }
1884}
1885
1886#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1887#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1888pub enum IngestEvent {
1889 Session(Session),
1890 Message(Message),
1891 Part(Part),
1892}
1893
1894#[derive(Debug, Clone, PartialEq, Eq, Default)]
1902pub struct IngestSummary {
1903 pub inserted: usize,
1907 pub matched: usize,
1909 pub sessions_inserted: usize,
1911 pub messages_inserted_total: usize,
1914 pub messages_inserted_searchable: usize,
1918 pub parts_inserted: usize,
1920 pub sessions_matched: usize,
1922 pub messages_matched_total: usize,
1924 pub messages_matched_searchable: usize,
1926 pub parts_matched: usize,
1928 pub dropped_events: usize,
1938 pub dropped_sessions: usize,
1943 pub skipped_files: usize,
1946 pub skipped_empty: usize,
1951 pub skipped_fresh: usize,
1955 pub storage_errors: usize,
1959 pub truncated_values: usize,
1962 pub drop_reasons: BTreeMap<&'static str, usize>,
1968}
1969
1970pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1976pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1977pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1978pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1979pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1980pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1981pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1982pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1983pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1984pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1985pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1986
1987#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1995pub struct BatchCounts {
1996 pub sessions_inserted: usize,
1997 pub sessions_matched: usize,
1998 pub messages_inserted_total: usize,
1999 pub messages_inserted_searchable: usize,
2000 pub messages_matched_total: usize,
2001 pub messages_matched_searchable: usize,
2002 pub parts_inserted: usize,
2003 pub parts_matched: usize,
2004}
2005
2006impl IngestSummary {
2007 pub fn accepted(&self) -> usize {
2008 self.inserted + self.matched
2009 }
2010
2011 pub fn add_batch(&mut self, counts: &BatchCounts) {
2015 self.sessions_inserted += counts.sessions_inserted;
2016 self.sessions_matched += counts.sessions_matched;
2017 self.messages_inserted_total += counts.messages_inserted_total;
2018 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2019 self.messages_matched_total += counts.messages_matched_total;
2020 self.messages_matched_searchable += counts.messages_matched_searchable;
2021 self.parts_inserted += counts.parts_inserted;
2022 self.parts_matched += counts.parts_matched;
2023 self.inserted +=
2024 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2025 self.matched +=
2026 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2027 }
2028
2029 pub fn merge(&mut self, other: &Self) {
2033 self.inserted += other.inserted;
2034 self.matched += other.matched;
2035 self.sessions_inserted += other.sessions_inserted;
2036 self.messages_inserted_total += other.messages_inserted_total;
2037 self.messages_inserted_searchable += other.messages_inserted_searchable;
2038 self.parts_inserted += other.parts_inserted;
2039 self.sessions_matched += other.sessions_matched;
2040 self.messages_matched_total += other.messages_matched_total;
2041 self.messages_matched_searchable += other.messages_matched_searchable;
2042 self.parts_matched += other.parts_matched;
2043 self.dropped_events += other.dropped_events;
2044 self.dropped_sessions += other.dropped_sessions;
2045 self.skipped_files += other.skipped_files;
2046 self.skipped_empty += other.skipped_empty;
2047 self.skipped_fresh += other.skipped_fresh;
2048 self.storage_errors += other.storage_errors;
2049 self.truncated_values += other.truncated_values;
2050 for (key, value) in &other.drop_reasons {
2051 *self.drop_reasons.entry(key).or_insert(0) += value;
2052 }
2053 }
2054
2055 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2060 for outcome in outcomes {
2061 if !matches!(outcome.status, OutcomeStatus::Error) {
2062 continue;
2063 }
2064 if outcome.kind == "session" {
2065 self.dropped_sessions += 1;
2066 } else {
2067 self.dropped_events += 1;
2068 }
2069 let reason = outcome
2070 .error
2071 .as_ref()
2072 .and_then(|error| error.reason_key)
2073 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2074 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2075 }
2076 }
2077
2078 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2079 for outcome in outcomes {
2080 match outcome.status {
2081 OutcomeStatus::Inserted => {
2082 self.inserted += 1;
2083 match outcome.kind {
2084 "session" => self.sessions_inserted += 1,
2085 "message" => {
2086 self.messages_inserted_total += 1;
2087 if outcome.searchable {
2088 self.messages_inserted_searchable += 1;
2089 }
2090 }
2091 "part" => self.parts_inserted += 1,
2092 _ => {}
2093 }
2094 }
2095 OutcomeStatus::Matched => {
2096 self.matched += 1;
2097 match outcome.kind {
2098 "session" => self.sessions_matched += 1,
2099 "message" => {
2100 self.messages_matched_total += 1;
2101 if outcome.searchable {
2102 self.messages_matched_searchable += 1;
2103 }
2104 }
2105 "part" => self.parts_matched += 1,
2106 _ => {}
2107 }
2108 }
2109 OutcomeStatus::Error => {
2110 if outcome.kind == "session" {
2116 self.dropped_sessions += 1;
2117 } else {
2118 self.dropped_events += 1;
2119 }
2120 let reason = outcome
2121 .error
2122 .as_ref()
2123 .and_then(|e| e.reason_key)
2124 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2125 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2126 }
2127 }
2128 }
2129 }
2130}
2131
2132#[derive(Debug, Clone, PartialEq)]
2137pub struct RowOutcome {
2138 pub index: usize,
2139 pub kind: &'static str,
2140 pub pk: Value,
2141 pub status: OutcomeStatus,
2142 pub error: Option<RowError>,
2143 pub searchable: bool,
2148}
2149
2150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2151pub enum OutcomeStatus {
2152 Inserted,
2153 Matched,
2154 Error,
2155}
2156
2157#[derive(Debug, Clone, PartialEq, Eq)]
2160pub struct RowError {
2161 pub message: String,
2162 pub field: Option<&'static str>,
2163 pub reason: Option<&'static str>,
2164 pub reason_key: Option<&'static str>,
2169}
2170
2171#[derive(Debug)]
2175struct BufferedSession {
2176 index: usize,
2177 session: Session,
2178}
2179
2180#[derive(Debug)]
2181struct BufferedMessage {
2182 index: usize,
2183 message: Message,
2184 parts: Vec<BufferedPart>,
2185 search_text: Option<String>,
2186}
2187
2188#[derive(Debug)]
2189struct BufferedPart {
2190 index: usize,
2191 part: Part,
2192}
2193
2194#[derive(Debug, Default)]
2211pub struct IngestValidator {
2212 session: Option<BufferedSession>,
2213 current_message: Option<BufferedMessage>,
2214 current_parts: Vec<BufferedPart>,
2215 messages: Vec<BufferedMessage>,
2216 seen_message_ids: HashSet<String>,
2220 seen_part_keys: HashSet<(String, String)>,
2223 completed: Vec<CompletedSubstream>,
2227}
2228
2229#[derive(Debug)]
2231struct CompletedSubstream {
2232 session_index: usize,
2233 session: Session,
2234 messages: Vec<BufferedMessage>,
2235}
2236
2237impl IngestValidator {
2238 pub async fn push(
2244 &mut self,
2245 store: &Store,
2246 index: usize,
2247 event: IngestEvent,
2248 ) -> Result<Vec<RowOutcome>> {
2249 match event {
2250 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2251 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2252 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2253 }
2254 }
2255
2256 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2261 self.close_current_substream();
2262 self.flush(store).await
2263 }
2264
2265 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2272 if self.completed.is_empty() {
2273 return Ok((Vec::new(), BatchCounts::default()));
2274 }
2275 let completed = std::mem::take(&mut self.completed);
2276 store.upsert_session_batch(completed).await
2277 }
2278
2279 pub fn pending_substreams(&self) -> usize {
2282 self.completed.len()
2283 }
2284
2285 async fn push_session(
2286 &mut self,
2287 _store: &Store,
2288 index: usize,
2289 mut session: Session,
2290 ) -> Result<Vec<RowOutcome>> {
2291 self.close_current_substream();
2295
2296 let trimmed = session.source_agent.trim();
2301 if trimmed.is_empty() {
2302 return Ok(vec![RowOutcome {
2303 index,
2304 kind: "session",
2305 pk: Value::String(session.id.clone()),
2306 status: OutcomeStatus::Error,
2307 error: Some(RowError {
2308 message: format!("session {} has empty source_agent after trim", session.id),
2309 field: Some("source_agent"),
2310 reason: None,
2311 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2312 }),
2313 searchable: false,
2314 }]);
2315 }
2316 if trimmed.len() != session.source_agent.len() {
2317 session.source_agent = trimmed.to_owned();
2318 }
2319
2320 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2321 return Ok(vec![RowOutcome {
2322 index,
2323 kind: "session",
2324 pk: Value::String(session.id.clone()),
2325 status: OutcomeStatus::Error,
2326 error: Some(RowError {
2327 message: format!(
2328 "session {} has parent_message_id without parent_session_id",
2329 session.id,
2330 ),
2331 field: Some("parent_message_id"),
2332 reason: None,
2333 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2334 }),
2335 searchable: false,
2336 }]);
2337 }
2338
2339 self.seen_message_ids.clear();
2340 self.seen_part_keys.clear();
2341 self.session = Some(BufferedSession { index, session });
2342 Ok(Vec::new())
2343 }
2344
2345 fn close_current_substream(&mut self) {
2346 self.flush_current_message();
2347 let Some(BufferedSession {
2348 index: session_index,
2349 session,
2350 }) = self.session.take()
2351 else {
2352 return;
2353 };
2354 let messages = std::mem::take(&mut self.messages);
2355 self.seen_message_ids.clear();
2356 self.seen_part_keys.clear();
2357 self.completed.push(CompletedSubstream {
2358 session_index,
2359 session,
2360 messages,
2361 });
2362 }
2363
2364 fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2365 let pk = Value::Array(vec![
2366 Value::String(message.session_id().to_owned()),
2367 Value::String(message.id().to_owned()),
2368 ]);
2369 let Some(session) = &self.session else {
2370 return vec![error_outcome(
2371 index,
2372 "message",
2373 pk,
2374 "first event in a session stream must be Session",
2375 None,
2376 DROP_REASON_MESSAGE_BEFORE_SESSION,
2377 )];
2378 };
2379 if message.session_id() != session.session.id {
2380 let msg = format!(
2381 "message {} references session {}, expected {}",
2382 message.id(),
2383 message.session_id(),
2384 session.session.id
2385 );
2386 return vec![error_outcome(
2387 index,
2388 "message",
2389 pk,
2390 &msg,
2391 Some("session_id"),
2392 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2393 )];
2394 }
2395 if !self.seen_message_ids.insert(message.id().to_owned()) {
2396 let msg = format!("duplicate message id {} in session substream", message.id());
2400 return vec![error_outcome(
2401 index,
2402 "message",
2403 pk,
2404 &msg,
2405 None,
2406 DROP_REASON_DUPLICATE_MESSAGE_ID,
2407 )];
2408 }
2409 self.flush_current_message();
2410 self.current_message = Some(BufferedMessage {
2411 index,
2412 message,
2413 parts: Vec::new(),
2414 search_text: None,
2415 });
2416 Vec::new()
2417 }
2418
2419 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2420 let pk = Value::Array(vec![
2421 Value::String(part.session_id.clone()),
2422 Value::String(part.message_id.clone()),
2423 Value::String(part.id.clone()),
2424 ]);
2425 let Some(current) = &self.current_message else {
2426 return vec![error_outcome(
2427 index,
2428 "part",
2429 pk,
2430 "part event appeared before a message",
2431 None,
2432 DROP_REASON_PART_BEFORE_MESSAGE,
2433 )];
2434 };
2435 if part.session_id != current.message.session_id() {
2436 let msg = format!(
2437 "part {} references session {}, expected {}",
2438 part.id,
2439 part.session_id,
2440 current.message.session_id()
2441 );
2442 return vec![error_outcome(
2443 index,
2444 "part",
2445 pk,
2446 &msg,
2447 Some("session_id"),
2448 DROP_REASON_PART_MESSAGE_MISMATCH,
2449 )];
2450 }
2451 if part.message_id != current.message.id() {
2452 let msg = format!(
2453 "part {} references message {}, expected {}",
2454 part.id,
2455 part.message_id,
2456 current.message.id()
2457 );
2458 return vec![error_outcome(
2459 index,
2460 "part",
2461 pk,
2462 &msg,
2463 Some("message_id"),
2464 DROP_REASON_PART_MESSAGE_MISMATCH,
2465 )];
2466 }
2467 let part_key = (part.message_id.clone(), part.id.clone());
2468 if !self.seen_part_keys.insert(part_key) {
2469 let msg = format!(
2470 "duplicate part id {} for message {} in session substream",
2471 part.id, part.message_id
2472 );
2473 return vec![error_outcome(
2474 index,
2475 "part",
2476 pk,
2477 &msg,
2478 None,
2479 DROP_REASON_DUPLICATE_PART_KEY,
2480 )];
2481 }
2482 self.current_parts.push(BufferedPart { index, part });
2483 Vec::new()
2484 }
2485
2486 fn flush_current_message(&mut self) {
2487 let Some(mut buffered) = self.current_message.take() else {
2488 return;
2489 };
2490 let parts = std::mem::take(&mut self.current_parts);
2491 let mut canonical_parts = Vec::with_capacity(parts.len());
2492 for part in &parts {
2493 canonical_parts.push(part.part.clone());
2494 }
2495 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2496 buffered.parts = parts;
2497 self.messages.push(buffered);
2498 }
2499}
2500
2501fn error_outcome(
2502 index: usize,
2503 kind: &'static str,
2504 pk: Value,
2505 message: &str,
2506 field: Option<&'static str>,
2507 reason_key: &'static str,
2508) -> RowOutcome {
2509 RowOutcome {
2510 index,
2511 kind,
2512 pk,
2513 status: OutcomeStatus::Error,
2514 error: Some(RowError {
2515 message: message.to_owned(),
2516 field,
2517 reason: None,
2518 reason_key: Some(reason_key),
2519 }),
2520 searchable: false,
2521 }
2522}
2523
2524fn error_outcomes_for_substream(
2529 session_index: usize,
2530 session: &Session,
2531 _messages: &[BufferedMessage],
2532 message: impl Into<String>,
2533 field: Option<&'static str>,
2534 reason_key: &'static str,
2535) -> Vec<RowOutcome> {
2536 let reason = field.map(|_| "immutable");
2537 vec![RowOutcome {
2538 index: session_index,
2539 kind: "session",
2540 pk: Value::String(session.id.clone()),
2541 status: OutcomeStatus::Error,
2542 error: Some(RowError {
2543 message: message.into(),
2544 field,
2545 reason,
2546 reason_key: Some(reason_key),
2547 }),
2548 searchable: false,
2549 }]
2550}
2551
2552fn success_outcomes_for_substream(
2558 session_index: usize,
2559 session: &Session,
2560 messages: &[BufferedMessage],
2561 existing_sessions: &std::collections::HashMap<String, Session>,
2562 existing_message_pks: &HashSet<(String, String)>,
2563 existing_part_pks: &HashSet<(String, String, String)>,
2564 counts: &mut BatchCounts,
2565) -> Vec<RowOutcome> {
2566 let session_was_present = existing_sessions.contains_key(&session.id);
2567 let session_status = if session_was_present {
2568 counts.sessions_matched += 1;
2569 UpsertStatus::Matched
2570 } else {
2571 counts.sessions_inserted += 1;
2572 UpsertStatus::Inserted
2573 };
2574
2575 let mut outcomes = Vec::with_capacity(1 + messages.len());
2576 outcomes.push(success_outcome(
2577 session_index,
2578 "session",
2579 Value::String(session.id.clone()),
2580 session_status,
2581 false,
2582 ));
2583 for buffered in messages {
2584 let key = (
2585 buffered.message.session_id().to_owned(),
2586 buffered.message.id().to_owned(),
2587 );
2588 let searchable = buffered.search_text.is_some();
2589 let message_status = if existing_message_pks.contains(&key) {
2590 counts.messages_matched_total += 1;
2591 if searchable {
2592 counts.messages_matched_searchable += 1;
2593 }
2594 UpsertStatus::Matched
2595 } else {
2596 counts.messages_inserted_total += 1;
2597 if searchable {
2598 counts.messages_inserted_searchable += 1;
2599 }
2600 UpsertStatus::Inserted
2601 };
2602 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2603 outcomes.push(success_outcome(
2604 buffered.index,
2605 "message",
2606 pk,
2607 message_status,
2608 searchable,
2609 ));
2610 for part in &buffered.parts {
2611 let part_key = (
2612 part.part.session_id.clone(),
2613 part.part.message_id.clone(),
2614 part.part.id.clone(),
2615 );
2616 let part_status = if existing_part_pks.contains(&part_key) {
2617 counts.parts_matched += 1;
2618 UpsertStatus::Matched
2619 } else {
2620 counts.parts_inserted += 1;
2621 UpsertStatus::Inserted
2622 };
2623 let part_pk = Value::Array(vec![
2624 Value::String(part_key.0),
2625 Value::String(part_key.1),
2626 Value::String(part_key.2),
2627 ]);
2628 outcomes.push(success_outcome(
2629 part.index,
2630 "part",
2631 part_pk,
2632 part_status,
2633 false,
2634 ));
2635 }
2636 }
2637 outcomes
2638}
2639
2640fn success_outcome(
2641 index: usize,
2642 kind: &'static str,
2643 pk: Value,
2644 status: UpsertStatus,
2645 searchable: bool,
2646) -> RowOutcome {
2647 let status = match status {
2648 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2649 UpsertStatus::Matched => OutcomeStatus::Matched,
2650 };
2651 RowOutcome {
2652 index,
2653 kind,
2654 pk,
2655 status,
2656 error: None,
2657 searchable,
2658 }
2659}
2660
2661#[derive(Debug, Clone, PartialEq, Eq)]
2662enum IngestError {
2663 ImmutableField {
2668 field: &'static str,
2669 session_id: String,
2670 stored: String,
2671 attempted: String,
2672 },
2673}
2674
2675impl std::fmt::Display for IngestError {
2676 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2677 match self {
2678 Self::ImmutableField {
2679 field,
2680 session_id,
2681 stored,
2682 attempted,
2683 } => write!(
2684 formatter,
2685 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2686 ),
2687 }
2688 }
2689}
2690
2691impl std::error::Error for IngestError {}
2692
2693fn ensure_immutable_match(
2697 existing: &Session,
2698 incoming: &Session,
2699) -> std::result::Result<(), IngestError> {
2700 if existing.source_agent != incoming.source_agent {
2701 return Err(IngestError::ImmutableField {
2702 field: "source_agent",
2703 session_id: incoming.id.clone(),
2704 stored: existing.source_agent.clone(),
2705 attempted: incoming.source_agent.clone(),
2706 });
2707 }
2708 if existing.project != incoming.project {
2709 return Err(IngestError::ImmutableField {
2710 field: "project",
2711 session_id: incoming.id.clone(),
2712 stored: (*existing.project).clone(),
2713 attempted: (*incoming.project).clone(),
2714 });
2715 }
2716 Ok(())
2717}
2718
2719pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2720 use crate::wire::Provenance;
2721 let mut chunks: Vec<String> = Vec::new();
2722 for part in parts {
2723 if part.provenance != Provenance::Conversational {
2726 continue;
2727 }
2728 match (message.role(), &part.kind) {
2729 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2730 if let Some(text) = text {
2731 chunks.push(text.to_string());
2732 }
2733 }
2734 (
2735 Role::User | Role::Assistant,
2736 PartKind::File {
2737 media_type,
2738 file_name,
2739 data,
2740 },
2741 ) => {
2742 if let Some(file_name) = file_name {
2743 chunks.push(file_name.clone());
2744 }
2745 if let Some(media_type) = media_type {
2746 chunks.push(media_type.clone());
2747 }
2748 if let FileData::Url(uri) = data {
2749 chunks.push(uri.clone());
2750 }
2751 }
2752 (
2753 Role::System | Role::Tool,
2754 PartKind::Text { .. }
2755 | PartKind::Reasoning { .. }
2756 | PartKind::File { .. }
2757 | PartKind::ToolCall { .. }
2758 | PartKind::ToolResult { .. }
2759 | PartKind::ToolApprovalRequest { .. }
2760 | PartKind::ToolApprovalResponse { .. },
2761 )
2762 | (
2763 Role::User | Role::Assistant,
2764 PartKind::Reasoning { .. }
2765 | PartKind::ToolCall { .. }
2766 | PartKind::ToolResult { .. }
2767 | PartKind::ToolApprovalRequest { .. }
2768 | PartKind::ToolApprovalResponse { .. },
2769 ) => {}
2770 }
2771 }
2772
2773 let text = chunks
2774 .into_iter()
2775 .filter(|chunk| !chunk.trim().is_empty())
2776 .collect::<Vec<_>>()
2777 .join("\n");
2778 if text.is_empty() { None } else { Some(text) }
2779}
2780
2781#[derive(Debug, Clone, PartialEq, Eq)]
2783pub struct SearchText(String);
2784
2785impl SearchText {
2786 pub fn as_str(&self) -> &str {
2787 &self.0
2788 }
2789
2790 pub fn into_inner(self) -> String {
2791 self.0
2792 }
2793}
2794
2795impl AsRef<str> for SearchText {
2796 fn as_ref(&self) -> &str {
2797 &self.0
2798 }
2799}
2800
2801#[derive(Debug, Clone, PartialEq)]
2802pub struct MessageWithParts {
2803 pub message: Message,
2804 pub parts: Vec<Part>,
2805}
2806
2807#[derive(Debug, Clone, PartialEq)]
2808pub struct SessionWithMessages {
2809 pub session: Session,
2810 pub messages: Vec<MessageWithParts>,
2811}
2812
2813#[derive(Debug, Clone)]
2814pub struct SessionViewParams<'a> {
2815 pub mode: ResponseMode,
2816 pub after_id: Option<&'a str>,
2817 pub limit: usize,
2818 pub budget_bytes: usize,
2819 pub session_from: SessionFrom,
2820}
2821
2822#[derive(Debug, Clone)]
2823pub struct MessageViewParams<'a> {
2824 pub context_depth: usize,
2825 pub after_id: Option<&'a str>,
2826 pub limit: usize,
2827 pub budget_bytes: usize,
2828}
2829
2830#[derive(Debug, Clone, PartialEq)]
2836pub enum GetLookup<T> {
2837 NotFound,
2838 UnknownAfterId,
2839 Found(T),
2840}
2841
2842#[derive(Debug, Clone, PartialEq)]
2846pub struct SessionPage {
2847 pub session: Session,
2848 pub messages: Vec<RetrievedMessage>,
2849 pub messages_remaining: usize,
2850}
2851
2852#[derive(Debug, Clone, PartialEq)]
2856pub struct MessagePage {
2857 pub session: Session,
2858 pub target: RetrievedMessage,
2859 pub target_parts: Vec<Part>,
2860 pub target_parts_remaining: usize,
2861 pub siblings: Vec<RetrievedMessage>,
2862}
2863
2864#[derive(Debug, Clone, PartialEq)]
2865pub struct RetrievedMessage {
2866 pub id: String,
2867 pub role: Role,
2868 pub timestamp: DateTime<Utc>,
2869 pub text: Option<String>,
2870 pub content: Option<String>,
2871 pub parts: Vec<Part>,
2872}
2873
2874#[derive(Debug, Clone)]
2875struct ScanRow {
2876 id: String,
2877 role: Role,
2878 timestamp: DateTime<Utc>,
2879 text: Option<String>,
2880 content: Option<String>,
2881}
2882
2883#[derive(Debug, Clone)]
2886pub struct ConversationalRow {
2887 pub session_id: String,
2888 pub message_id: String,
2889 pub role: Role,
2890 pub timestamp: DateTime<Utc>,
2891 pub text: SearchText,
2892}
2893
2894fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2899 let capped = items.len().min(limit.clamp(1, 1000));
2900 let mut acc = 0usize;
2901 let mut emitted = 0usize;
2902 for item in &items[..capped] {
2903 let next = acc.saturating_add(size(item));
2904 if emitted > 0 && next > budget_bytes {
2905 break;
2906 }
2907 acc = next;
2908 emitted += 1;
2909 }
2910 emitted
2911}
2912
2913fn role_from_str(value: &str) -> Result<Role> {
2914 match value {
2915 "system" => Ok(Role::System),
2916 "user" => Ok(Role::User),
2917 "assistant" => Ok(Role::Assistant),
2918 "tool" => Ok(Role::Tool),
2919 other => anyhow::bail!("unknown message role {other}"),
2920 }
2921}
2922
2923const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2931 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2932 (
2933 "session_id",
2934 BuiltinIndexType::BTree,
2935 "messages_session_id_btree",
2936 ),
2937 (
2938 "timestamp",
2939 BuiltinIndexType::BTree,
2940 "messages_timestamp_btree",
2941 ),
2942 (
2943 "source_agent",
2944 BuiltinIndexType::Bitmap,
2945 "messages_source_agent_bitmap",
2946 ),
2947 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2948];
2949
2950const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2953 (
2954 "session_id",
2955 BuiltinIndexType::BTree,
2956 "parts_session_id_btree",
2957 ),
2958 (
2959 "message_id",
2960 BuiltinIndexType::BTree,
2961 "parts_message_id_btree",
2962 ),
2963];
2964
2965const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2968 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2969
2970fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2971 Predicate::In(
2972 column,
2973 values.iter().cloned().map(ScalarValue::String).collect(),
2974 )
2975}
2976
2977fn embedded_scope(filter: &Predicate) -> Predicate {
2982 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
2983}
2984
2985fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
2986 let inserted = usize::try_from(inserted_rows)
2987 .unwrap_or(usize::MAX)
2988 .min(total);
2989 let mut statuses = Vec::with_capacity(total);
2990 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
2991 statuses.extend(std::iter::repeat_n(
2992 UpsertStatus::Matched,
2993 total.saturating_sub(inserted),
2994 ));
2995 statuses
2996}
2997
2998pub(crate) const SESSIONS: &str = "sessions";
3002pub(crate) const MESSAGES: &str = "messages";
3003pub(crate) const PARTS: &str = "parts";
3004
3005pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3008
3009pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3012
3013const IVF_PQ_NUM_BITS: u8 = 8;
3019const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3020const IVF_PQ_MAX_ITERS: usize = 15;
3021
3022const FTS_NGRAM_MIN: u32 = 3;
3026const FTS_NGRAM_MAX: u32 = 5;
3027
3028pub fn pond_index_intents() -> IndexIntents {
3031 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3032}
3033
3034pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3038 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3039 messages.push(IndexIntent {
3040 name: MESSAGES_FTS_INDEX,
3041 column: "search_text",
3042 trigger: IndexTrigger::OnAnyRows,
3043 params: IndexParamsKind::InvertedFtsNgram {
3044 min: FTS_NGRAM_MIN,
3045 max: FTS_NGRAM_MAX,
3046 },
3047 });
3048 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3049 messages.push(IndexIntent {
3050 name,
3051 column,
3052 trigger: IndexTrigger::OnAnyRows,
3053 params: IndexParamsKind::Scalar(kind.clone()),
3054 });
3055 }
3056 messages.push(IndexIntent {
3057 name: MESSAGES_VECTOR_INDEX,
3058 column: "vector",
3059 trigger: IndexTrigger::OnNonNullCount {
3060 column: "vector",
3061 threshold: vector_threshold,
3062 },
3063 params: IndexParamsKind::IvfPqCosine {
3064 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3065 num_bits: IVF_PQ_NUM_BITS,
3066 max_iters: IVF_PQ_MAX_ITERS,
3067 },
3068 });
3069 let parts = PARTS_SCALAR_INDICES
3070 .iter()
3071 .map(|(column, kind, name)| IndexIntent {
3072 name,
3073 column,
3074 trigger: IndexTrigger::OnAnyRows,
3075 params: IndexParamsKind::Scalar(kind.clone()),
3076 })
3077 .collect();
3078 let sessions = SESSIONS_SCALAR_INDICES
3079 .iter()
3080 .map(|(column, kind, name)| IndexIntent {
3081 name,
3082 column,
3083 trigger: IndexTrigger::OnAnyRows,
3084 params: IndexParamsKind::Scalar(kind.clone()),
3085 })
3086 .collect();
3087 IndexIntents {
3088 sessions,
3089 messages,
3090 parts,
3091 }
3092}
3093
3094pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3098
3099static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3105
3106pub fn embedding_dim() -> usize {
3109 EMBEDDING_DIM_RUNTIME
3110 .get()
3111 .copied()
3112 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3113}
3114
3115pub fn init_embedding_dim(dim: usize) {
3117 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3118}
3119
3120pub(crate) fn write_params_for_create() -> WriteParams {
3127 WriteParams {
3128 data_storage_version: Some(LanceFileVersion::V2_1),
3129 enable_v2_manifest_paths: true,
3130 enable_stable_row_ids: true,
3131 auto_cleanup: Some(AutoCleanupParams {
3132 interval: 20,
3133 older_than: chrono::TimeDelta::days(1),
3134 }),
3135 skip_auto_cleanup: true,
3136 ..WriteParams::default()
3137 }
3138}
3139
3140fn export_schema(table: Table) -> Arc<Schema> {
3141 match table {
3142 Table::Sessions => session_schema(),
3143 Table::Messages => message_schema(),
3144 Table::Parts => part_schema(),
3145 }
3146}
3147
3148fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3149 let expected = export_schema(table);
3150 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3151 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3152 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3153 if actual_names != expected_names {
3154 anyhow::bail!(
3155 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3156 table.as_str(),
3157 );
3158 }
3159 Ok(())
3160}
3161
3162async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3163 let source_uri = source
3164 .to_str()
3165 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3166 let dataset = Dataset::open(source_uri)
3167 .await
3168 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3169 ensure_schema_matches_archive(&dataset, table)?;
3170 Ok(dataset)
3171}
3172
3173pub(crate) fn session_schema() -> Arc<Schema> {
3174 Arc::new(Schema::new(vec![
3175 primary_field("id", DataType::Utf8, false),
3176 Field::new("parent_session_id", DataType::Utf8, true),
3177 Field::new("parent_message_id", DataType::Utf8, true),
3178 Field::new("source_agent", DataType::Utf8, false),
3179 Field::new(
3180 "created_at",
3181 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3182 false,
3183 ),
3184 Field::new("project", DataType::Utf8, false),
3185 json_field("options", false),
3186 ]))
3187}
3188
3189pub(crate) fn message_schema() -> Arc<Schema> {
3190 Arc::new(Schema::new(vec![
3191 primary_field("session_id", DataType::Utf8, false),
3192 primary_field("id", DataType::Utf8, false),
3193 Field::new(
3194 "timestamp",
3195 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3196 false,
3197 ),
3198 Field::new("role", DataType::Utf8, false),
3199 Field::new("source_agent", DataType::Utf8, false),
3200 Field::new("project", DataType::Utf8, false),
3201 Field::new("content", DataType::Utf8, true),
3202 Field::new("search_text", DataType::Utf8, true),
3203 Field::new("vector", embedding_vector_type(), true),
3206 Field::new("embedding_model", DataType::Utf8, true),
3207 json_field("options", false),
3208 ]))
3209}
3210
3211pub(crate) fn part_schema() -> Arc<Schema> {
3212 Arc::new(Schema::new(vec![
3213 primary_field("session_id", DataType::Utf8, false),
3214 primary_field("message_id", DataType::Utf8, false),
3215 primary_field("id", DataType::Utf8, false),
3216 Field::new("ordinal", DataType::Int32, false),
3217 Field::new("type", DataType::Utf8, false),
3218 Field::new("provenance", DataType::Utf8, false),
3221 json_field("variant_data", false),
3222 legacy_blob_field("data", true),
3223 json_field("options", false),
3224 ]))
3225}
3226
3227pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3228 let arrays = schema
3229 .fields()
3230 .iter()
3231 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3232 .collect();
3233 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3234}
3235
3236pub(crate) fn empty_reader(
3237 schema: Arc<Schema>,
3238) -> Result<
3239 RecordBatchIterator<
3240 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3241 >,
3242> {
3243 let batch = empty_batch(schema.clone())?;
3244 Ok(RecordBatchIterator::new(
3245 vec![Ok(batch)].into_iter(),
3246 schema,
3247 ))
3248}
3249
3250pub(crate) struct MessageBatchRow<'a> {
3251 pub message: &'a Message,
3252 pub source_agent: &'a str,
3253 pub project: &'a str,
3254 pub search_text: Option<&'a str>,
3255}
3256
3257fn embedding_vector_type() -> DataType {
3263 DataType::FixedSizeList(
3264 Arc::new(Field::new("item", DataType::Float16, true)),
3265 embedding_dim() as i32,
3266 )
3267}
3268
3269fn embedding_update_schema() -> Arc<Schema> {
3273 Arc::new(Schema::new(vec![
3274 primary_field("session_id", DataType::Utf8, false),
3275 primary_field("id", DataType::Utf8, false),
3276 Field::new("vector", embedding_vector_type(), true),
3277 Field::new("embedding_model", DataType::Utf8, true),
3278 ]))
3279}
3280
3281pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3284 let dim = embedding_dim();
3285 let mut flat = Vec::with_capacity(rows.len() * dim);
3286 for row in rows {
3287 if row.vector.len() != dim {
3288 anyhow::bail!(
3289 "embedding for message {} has dim {}, expected {dim}",
3290 row.id,
3291 row.vector.len(),
3292 );
3293 }
3294 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3295 }
3296 let values = Float16Array::from(flat);
3297 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3298 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3299 .context("failed to build embedding vector column")?;
3300
3301 RecordBatch::try_new(
3302 embedding_update_schema(),
3303 vec![
3304 Arc::new(StringArray::from(
3305 rows.iter()
3306 .map(|row| row.session_id.as_str())
3307 .collect::<Vec<_>>(),
3308 )),
3309 Arc::new(StringArray::from(
3310 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3311 )),
3312 Arc::new(vectors),
3313 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3314 ],
3315 )
3316 .context("failed to build embedding update batch")
3317}
3318
3319const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3324
3325fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3330 let mut chunks = Vec::new();
3331 let mut start = 0usize;
3332 let mut running = 0usize;
3333 for (index, &row) in cells.iter().enumerate() {
3334 if running + row > COLUMN_BYTE_BUDGET && index > start {
3335 chunks.push(start..index);
3336 start = index;
3337 running = 0;
3338 }
3339 running += row;
3340 }
3341 if start < cells.len() {
3342 chunks.push(start..cells.len());
3343 }
3344 chunks
3345}
3346
3347fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3348 if bytes >= COLUMN_BYTE_BUDGET {
3349 anyhow::bail!(
3350 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3351 overflow Arrow's i32 offset buffer"
3352 );
3353 }
3354 Ok(())
3355}
3356
3357async fn merge_insert_chunks(
3358 handle: &Handle,
3359 table: Table,
3360 batches: Vec<RecordBatch>,
3361) -> Result<u64> {
3362 let mut inserted = 0u64;
3363 for batch in batches {
3364 let rows = batch.num_rows();
3365 inserted += handle.merge_insert(table, batch, rows).await?;
3366 }
3367 Ok(inserted)
3368}
3369
3370pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3371 let options = sessions
3372 .iter()
3373 .map(|session| json_bytes(&session.options))
3374 .collect::<Result<Vec<_>>>()?;
3375 let mut cells = Vec::with_capacity(sessions.len());
3376 for (session, encoded) in sessions.iter().zip(&options) {
3377 let columns = [
3378 session.id.len(),
3379 session.parent_session_id.as_deref().map_or(0, str::len),
3380 session.parent_message_id.as_deref().map_or(0, str::len),
3381 session.source_agent.len(),
3382 session.project.as_str().len(),
3383 encoded.len(),
3384 ];
3385 for bytes in columns {
3386 guard_cell("sessions", &session.id, bytes)?;
3387 }
3388 cells.push(columns.iter().sum());
3389 }
3390 chunk_ranges(&cells)
3391 .into_iter()
3392 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3393 .collect()
3394}
3395
3396fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3397 let schema = session_schema();
3398 RecordBatch::try_new(
3399 schema.clone(),
3400 vec![
3401 Arc::new(StringArray::from(
3402 sessions
3403 .iter()
3404 .map(|session| session.id.as_str())
3405 .collect::<Vec<_>>(),
3406 )),
3407 Arc::new(StringArray::from(
3408 sessions
3409 .iter()
3410 .map(|session| session.parent_session_id.as_deref())
3411 .collect::<Vec<_>>(),
3412 )),
3413 Arc::new(StringArray::from(
3414 sessions
3415 .iter()
3416 .map(|session| session.parent_message_id.as_deref())
3417 .collect::<Vec<_>>(),
3418 )),
3419 Arc::new(StringArray::from(
3420 sessions
3421 .iter()
3422 .map(|session| session.source_agent.as_str())
3423 .collect::<Vec<_>>(),
3424 )),
3425 Arc::new(
3426 TimestampMicrosecondArray::from(
3427 sessions
3428 .iter()
3429 .map(|session| micros(session.created_at))
3430 .collect::<Vec<_>>(),
3431 )
3432 .with_timezone("UTC"),
3433 ),
3434 Arc::new(StringArray::from(
3435 sessions
3436 .iter()
3437 .map(|session| session.project.as_str())
3438 .collect::<Vec<_>>(),
3439 )),
3440 Arc::new(LargeBinaryArray::from_iter_values(
3441 options.iter().map(Vec::as_slice),
3442 )),
3443 ],
3444 )
3445 .context("failed to build session batch")
3446}
3447
3448pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3449 let options = rows
3450 .iter()
3451 .map(|row| json_bytes(row.message.options()))
3452 .collect::<Result<Vec<_>>>()?;
3453 let mut cells = Vec::with_capacity(rows.len());
3454 for (row, encoded) in rows.iter().zip(&options) {
3455 let columns = [
3456 row.message.session_id().len(),
3457 row.message.id().len(),
3458 row.message.role().as_str().len(),
3459 row.source_agent.len(),
3460 row.project.len(),
3461 row.message.system_content().map_or(0, str::len),
3462 row.search_text.map_or(0, str::len),
3463 encoded.len(),
3464 ];
3465 for bytes in columns {
3466 guard_cell("messages", row.message.id(), bytes)?;
3467 }
3468 cells.push(columns.iter().sum());
3469 }
3470 chunk_ranges(&cells)
3471 .into_iter()
3472 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3473 .collect()
3474}
3475
3476fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3477 let schema = message_schema();
3478 RecordBatch::try_new(
3479 schema.clone(),
3480 vec![
3481 Arc::new(StringArray::from(
3482 rows.iter()
3483 .map(|row| row.message.session_id())
3484 .collect::<Vec<_>>(),
3485 )),
3486 Arc::new(StringArray::from(
3487 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3488 )),
3489 Arc::new(
3490 TimestampMicrosecondArray::from(
3491 rows.iter()
3492 .map(|row| micros(row.message.timestamp()))
3493 .collect::<Vec<_>>(),
3494 )
3495 .with_timezone("UTC"),
3496 ),
3497 Arc::new(StringArray::from(
3498 rows.iter()
3499 .map(|row| row.message.role().as_str())
3500 .collect::<Vec<_>>(),
3501 )),
3502 Arc::new(StringArray::from(
3503 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3504 )),
3505 Arc::new(StringArray::from(
3506 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3507 )),
3508 Arc::new(StringArray::from(
3509 rows.iter()
3510 .map(|row| row.message.system_content())
3511 .collect::<Vec<_>>(),
3512 )),
3513 Arc::new(StringArray::from(
3514 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3515 )),
3516 new_null_array(&embedding_vector_type(), rows.len()),
3520 new_null_array(&DataType::Utf8, rows.len()),
3521 Arc::new(LargeBinaryArray::from_iter_values(
3522 options.iter().map(Vec::as_slice),
3523 )),
3524 ],
3525 )
3526 .context("failed to build message batch")
3527}
3528
3529pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3530 let variant_data = parts
3531 .iter()
3532 .map(|part| part_variant_json(&part.kind))
3533 .collect::<Result<Vec<_>>>()?;
3534 let options = parts
3535 .iter()
3536 .map(|part| json_bytes(&part.options))
3537 .collect::<Result<Vec<_>>>()?;
3538 let mut cells = Vec::with_capacity(parts.len());
3539 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3542 let columns = [
3543 part.session_id.len(),
3544 part.message_id.len(),
3545 part.id.len(),
3546 part.kind.type_name().len(),
3547 part.provenance.as_str().len(),
3548 variant.len(),
3549 encoded.len(),
3550 ];
3551 for bytes in columns {
3552 guard_cell("parts", &part.id, bytes)?;
3553 }
3554 cells.push(columns.iter().sum());
3555 }
3556 chunk_ranges(&cells)
3557 .into_iter()
3558 .map(|range| {
3559 parts_chunk(
3560 &parts[range.clone()],
3561 &variant_data[range.clone()],
3562 &options[range],
3563 )
3564 })
3565 .collect()
3566}
3567
3568fn parts_chunk(
3569 parts: &[Part],
3570 variant_data: &[Vec<u8>],
3571 options: &[Vec<u8>],
3572) -> Result<RecordBatch> {
3573 let schema = part_schema();
3574 let blob_payloads: Vec<Option<&[u8]>> = parts
3578 .iter()
3579 .map(|part| match &part.kind {
3580 PartKind::File { data, .. } => Some(match data {
3581 FileData::String(value) => value.as_bytes(),
3582 FileData::Bytes(value) => value.as_slice(),
3583 FileData::Url(value) => value.as_bytes(),
3584 }),
3585 PartKind::Text { .. }
3586 | PartKind::Reasoning { .. }
3587 | PartKind::ToolCall { .. }
3588 | PartKind::ToolResult { .. }
3589 | PartKind::ToolApprovalRequest { .. }
3590 | PartKind::ToolApprovalResponse { .. } => None,
3591 })
3592 .collect();
3593 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3594
3595 RecordBatch::try_new(
3596 schema.clone(),
3597 vec![
3598 Arc::new(StringArray::from(
3599 parts
3600 .iter()
3601 .map(|part| part.session_id.as_str())
3602 .collect::<Vec<_>>(),
3603 )),
3604 Arc::new(StringArray::from(
3605 parts
3606 .iter()
3607 .map(|part| part.message_id.as_str())
3608 .collect::<Vec<_>>(),
3609 )),
3610 Arc::new(StringArray::from(
3611 parts
3612 .iter()
3613 .map(|part| part.id.as_str())
3614 .collect::<Vec<_>>(),
3615 )),
3616 Arc::new(Int32Array::from(
3617 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3618 )),
3619 Arc::new(StringArray::from(
3620 parts
3621 .iter()
3622 .map(|part| part.kind.type_name())
3623 .collect::<Vec<_>>(),
3624 )),
3625 Arc::new(StringArray::from(
3626 parts
3627 .iter()
3628 .map(|part| part.provenance.as_str())
3629 .collect::<Vec<_>>(),
3630 )),
3631 Arc::new(LargeBinaryArray::from_iter_values(
3632 variant_data.iter().map(Vec::as_slice),
3633 )),
3634 Arc::new(blob_array),
3635 Arc::new(LargeBinaryArray::from_iter_values(
3636 options.iter().map(Vec::as_slice),
3637 )),
3638 ],
3639 )
3640 .context("failed to build parts batch")
3641}
3642
3643pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3644 Ok(Session {
3645 id: string(batch, "id", row)?.context("session id is null")?,
3646 parent_session_id: string(batch, "parent_session_id", row)?,
3647 parent_message_id: string(batch, "parent_message_id", row)?,
3648 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3649 created_at: datetime(batch, "created_at", row)?,
3650 project: crate::adapter::Extracted::from_stored(
3651 string(batch, "project", row)?.context("project is null")?,
3652 ),
3653 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3654 })
3655}
3656
3657pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3658 let id = string(batch, "id", row)?.context("message id is null")?;
3659 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3660 let timestamp = datetime(batch, "timestamp", row)?;
3661 let options =
3662 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3663
3664 match string(batch, "role", row)?
3665 .context("message role is null")?
3666 .as_str()
3667 {
3668 "system" => Ok(Message::System {
3669 id,
3670 session_id,
3671 timestamp,
3672 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3679 options,
3680 }),
3681 "user" => Ok(Message::User {
3682 id,
3683 session_id,
3684 timestamp,
3685 options,
3686 }),
3687 "assistant" => Ok(Message::Assistant {
3688 id,
3689 session_id,
3690 timestamp,
3691 options,
3692 }),
3693 "tool" => Ok(Message::Tool {
3694 id,
3695 session_id,
3696 timestamp,
3697 options,
3698 }),
3699 other => anyhow::bail!("unknown message role {other}"),
3700 }
3701}
3702
3703pub(crate) fn part_from_batch(
3704 batch: &RecordBatch,
3705 row: usize,
3706 file_data: Option<FileData>,
3707) -> Result<Part> {
3708 let type_name = string(batch, "type", row)?.context("part type is null")?;
3709 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3710 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3711 Ok(Part {
3712 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3713 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3714 id: string(batch, "id", row)?.context("part id is null")?,
3715 ordinal: int32(batch, "ordinal", row)?,
3716 provenance: provenance_from_str(&provenance)?,
3717 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3718 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3719 })
3720}
3721
3722fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3723 match value {
3724 "conversational" => Ok(crate::wire::Provenance::Conversational),
3725 "injected" => Ok(crate::wire::Provenance::Injected),
3726 other => anyhow::bail!("unknown part provenance {other}"),
3727 }
3728}
3729
3730fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3731 let kind = file_data_kind(variant_data)?;
3732 match kind.as_str() {
3733 "string" => {
3734 let text = std::str::from_utf8(bytes)
3735 .context("file string payload is not UTF-8")?
3736 .to_owned();
3737 Ok(FileData::String(text))
3738 }
3739 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3740 "url" => Ok(FileData::Url(
3741 std::str::from_utf8(bytes)
3742 .context("file URL payload is not UTF-8")?
3743 .to_owned(),
3744 )),
3745 other => anyhow::bail!("unknown file data_kind {other}"),
3746 }
3747}
3748
3749fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3750 let value = json_parse::<Value>(variant_data)?;
3751 value
3752 .get("data_kind")
3753 .and_then(Value::as_str)
3754 .map(str::to_owned)
3755 .context("file part variant_data missing data_kind")
3756}
3757
3758fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3759 batch
3760 .column_by_name(name)
3761 .with_context(|| format!("missing column {name}"))?
3762 .as_any()
3763 .downcast_ref::<UInt64Array>()
3764 .with_context(|| format!("column {name} is not UInt64"))
3765}
3766
3767pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3768 let array = batch
3769 .column_by_name(name)
3770 .with_context(|| format!("missing column {name}"))?
3771 .as_any()
3772 .downcast_ref::<StringArray>()
3773 .with_context(|| format!("column {name} is not Utf8"))?;
3774 if array.is_null(row) {
3775 Ok(None)
3776 } else {
3777 Ok(Some(array.value(row).to_owned()))
3778 }
3779}
3780
3781fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3782 let column = batch
3786 .column_by_name(name)
3787 .with_context(|| format!("missing column {name}"))?;
3788 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3789 return if array.is_null(row) {
3790 Ok(None)
3791 } else {
3792 Ok(Some(
3793 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3794 ))
3795 };
3796 }
3797 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3798 return if array.is_null(row) {
3799 Ok(None)
3800 } else {
3801 Ok(Some(array.value(row).as_bytes().to_vec()))
3802 };
3803 }
3804 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3805 return if array.is_null(row) {
3806 Ok(None)
3807 } else {
3808 Ok(Some(array.value(row).as_bytes().to_vec()))
3809 };
3810 }
3811 anyhow::bail!("column {name} is not a JSON-compatible array")
3812}
3813
3814fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3815 let array = batch
3816 .column_by_name(name)
3817 .with_context(|| format!("missing column {name}"))?
3818 .as_any()
3819 .downcast_ref::<Int32Array>()
3820 .with_context(|| format!("column {name} is not Int32"))?;
3821 Ok(array.value(row))
3822}
3823
3824pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3825 let array = batch
3826 .column_by_name(name)
3827 .with_context(|| format!("missing column {name}"))?
3828 .as_any()
3829 .downcast_ref::<Float32Array>()
3830 .with_context(|| format!("column {name} is not Float32"))?;
3831 Ok(array.value(row))
3832}
3833
3834pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3835 let array = batch
3836 .column_by_name(name)
3837 .with_context(|| format!("missing column {name}"))?
3838 .as_any()
3839 .downcast_ref::<TimestampMicrosecondArray>()
3840 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3841 Utc.timestamp_micros(array.value(row))
3842 .single()
3843 .context("timestamp is out of range")
3844}
3845
3846fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3847 Field::new(name, data_type, nullable).with_metadata(
3848 [(
3849 "lance-schema:unenforced-primary-key".to_owned(),
3850 "true".to_owned(),
3851 )]
3852 .into(),
3853 )
3854}
3855
3856fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3866 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3867 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3868 .into_iter()
3869 .collect(),
3870 )
3871}
3872
3873fn json_field(name: &str, nullable: bool) -> Field {
3874 lance_arrow::json::json_field(name, nullable)
3875}
3876
3877fn micros(timestamp: DateTime<Utc>) -> i64 {
3878 timestamp.timestamp_micros()
3879}
3880
3881fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3882 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3890 lance_arrow::json::encode_json(&text)
3891 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3892}
3893
3894fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3895 serde_json::from_slice(value).context("failed to parse JSON field")
3896}
3897
3898fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3899 if let PartKind::File {
3900 media_type,
3901 file_name,
3902 data,
3903 } = kind
3904 {
3905 let data_kind = match data {
3906 FileData::String(_) => "string",
3907 FileData::Bytes(_) => "bytes",
3908 FileData::Url(_) => "url",
3909 };
3910 return json_bytes(&serde_json::json!({
3911 "media_type": media_type,
3912 "file_name": file_name,
3913 "data_kind": data_kind,
3914 }));
3915 }
3916 let value = serde_json::to_value(kind)?;
3917 let mut object = value
3918 .as_object()
3919 .cloned()
3920 .context("part variant did not serialize to an object")?;
3921 object.remove("type");
3922 json_bytes(&object)
3923}
3924
3925fn part_kind_from_json(
3926 type_name: &str,
3927 variant_data: &[u8],
3928 file_data: Option<FileData>,
3929) -> Result<PartKind> {
3930 let mut value = json_parse::<Value>(variant_data)?;
3931 let object = value
3932 .as_object_mut()
3933 .context("part variant data is not an object")?;
3934 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3935 if let Some(data) = file_data {
3936 object.remove("data_kind");
3937 object.insert("data".to_owned(), serde_json::to_value(data)?);
3938 }
3939 serde_json::from_value(value).context("failed to parse part kind")
3940}
3941
3942#[cfg(test)]
3943mod tests {
3944 #![allow(clippy::expect_used, clippy::unwrap_used)]
3945
3946 use super::*;
3947 use crate::{
3948 adapter::Extracted,
3949 handlers::ingest_events,
3950 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3951 };
3952 use chrono::Utc;
3953 use serde_json::json;
3954 use tempfile::TempDir;
3955
3956 fn synthetic_session(id: &str) -> Session {
3957 Session {
3958 id: id.to_owned(),
3959 parent_session_id: None,
3960 parent_message_id: None,
3961 source_agent: "claude-code".to_owned(),
3962 created_at: Utc::now(),
3963 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3964 options: ProviderOptions::new(),
3965 }
3966 }
3967
3968 #[test]
3969 fn search_text_excludes_injected_parts() {
3970 use crate::wire::Provenance;
3971 let message = Message::User {
3972 id: "m1".to_owned(),
3973 session_id: "s1".to_owned(),
3974 timestamp: Utc::now(),
3975 options: ProviderOptions::new(),
3976 };
3977 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
3978 session_id: "s1".to_owned(),
3979 id: id.to_owned(),
3980 message_id: "m1".to_owned(),
3981 ordinal: 0,
3982 provenance,
3983 options: ProviderOptions::new(),
3984 kind: PartKind::Text {
3985 text: Some(Extracted::from_test_value(text.to_owned())),
3986 },
3987 };
3988
3989 let conversational = search_text(
3992 &message,
3993 &[text_part(
3994 "p1",
3995 "real human prompt",
3996 Provenance::Conversational,
3997 )],
3998 );
3999 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4000
4001 let injected = search_text(
4002 &message,
4003 &[text_part(
4004 "p2",
4005 "<task-notification>...</task-notification>",
4006 Provenance::Injected,
4007 )],
4008 );
4009 assert!(
4010 injected.is_none(),
4011 "a message whose only part is injected has null search_text"
4012 );
4013 }
4014
4015 #[test]
4016 fn chunk_ranges_splits_on_byte_budget() {
4017 assert!(chunk_ranges(&[]).is_empty());
4018 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4019
4020 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4021 assert_eq!(
4022 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4023 vec![0..1, 1..2, 2..3],
4024 );
4025
4026 assert_eq!(
4028 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4029 vec![0..1, 1..2, 2..3],
4030 );
4031 }
4032
4033 #[tokio::test]
4034 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4035 let temp = TempDir::new()?;
4040 let store = Store::open_local(temp.path()).await?;
4041 let session = synthetic_session("ordering");
4042 let orphan_part = Part {
4043 session_id: session.id.clone(),
4044 id: "orphan-part".to_owned(),
4045 message_id: "missing-message".to_owned(),
4046 ordinal: 0,
4047 provenance: crate::wire::Provenance::Conversational,
4048 options: ProviderOptions::new(),
4049 kind: PartKind::Text {
4050 text: Some(Extracted::from_test_value("orphan".to_owned())),
4051 },
4052 };
4053 let valid_message = Message::User {
4054 id: "valid-message".to_owned(),
4055 session_id: session.id.clone(),
4056 timestamp: Utc::now(),
4057 options: ProviderOptions::new(),
4058 };
4059 let valid_part = Part {
4060 session_id: session.id.clone(),
4061 id: "valid-part".to_owned(),
4062 message_id: valid_message.id().to_owned(),
4063 ordinal: 0,
4064 provenance: crate::wire::Provenance::Conversational,
4065 options: ProviderOptions::new(),
4066 kind: PartKind::Text {
4067 text: Some(Extracted::from_test_value("kept".to_owned())),
4068 },
4069 };
4070
4071 let mut validator = IngestValidator::default();
4072 validator
4073 .push(&store, 0, IngestEvent::Session(session.clone()))
4074 .await?;
4075 let part_outcomes = validator
4076 .push(&store, 1, IngestEvent::Part(orphan_part))
4077 .await?;
4078 assert_eq!(part_outcomes.len(), 1);
4079 assert_eq!(part_outcomes[0].kind, "part");
4080 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4081 assert!(
4082 part_outcomes[0]
4083 .error
4084 .as_ref()
4085 .map(|e| e.message.contains("part event appeared before a message"))
4086 .unwrap_or(false),
4087 "error message must explain the ordering violation: {part_outcomes:?}"
4088 );
4089 validator
4090 .push(&store, 2, IngestEvent::Message(valid_message))
4091 .await?;
4092 validator
4093 .push(&store, 3, IngestEvent::Part(valid_part))
4094 .await?;
4095 validator.finish(&store).await?;
4096
4097 let (sessions, messages, parts) = store.row_counts().await?;
4098 assert_eq!(sessions, 1, "session committed despite the orphan part");
4099 assert_eq!(messages, 1, "valid message committed");
4100 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4101
4102 Ok(())
4103 }
4104
4105 #[tokio::test]
4106 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4107 let temp = TempDir::new()?;
4111 let store = Store::open_local(temp.path()).await?;
4112 let session = synthetic_session("duplicate-message");
4113 let first = Message::User {
4114 id: "message-1".to_owned(),
4115 session_id: session.id.clone(),
4116 timestamp: Utc::now(),
4117 options: ProviderOptions::new(),
4118 };
4119 let second = Message::Assistant {
4120 id: "message-1".to_owned(),
4121 session_id: session.id.clone(),
4122 timestamp: Utc::now(),
4123 options: ProviderOptions::new(),
4124 };
4125
4126 let mut validator = IngestValidator::default();
4127 validator
4128 .push(&store, 0, IngestEvent::Session(session.clone()))
4129 .await?;
4130 validator
4131 .push(&store, 1, IngestEvent::Message(first))
4132 .await?;
4133 let dup_outcomes = validator
4134 .push(&store, 2, IngestEvent::Message(second))
4135 .await?;
4136 assert_eq!(dup_outcomes.len(), 1);
4137 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4138 assert!(
4139 dup_outcomes[0]
4140 .error
4141 .as_ref()
4142 .map(|e| e.message.contains("duplicate message id message-1"))
4143 .unwrap_or(false),
4144 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4145 );
4146
4147 validator.finish(&store).await?;
4148 let (sessions, messages, _) = store.row_counts().await?;
4149 assert_eq!(sessions, 1, "session committed");
4150 assert_eq!(messages, 1, "only the first message committed");
4151
4152 Ok(())
4153 }
4154
4155 #[tokio::test(flavor = "multi_thread")]
4163 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4164 use crate::wire::{FileData, PartKind, Provenance};
4165 let temp = TempDir::new()?;
4166 let store = Store::open_local(temp.path()).await?;
4167
4168 let session = synthetic_session("compact-blob");
4169 store
4170 .upsert_sessions(std::slice::from_ref(&session))
4171 .await?;
4172
4173 let make_part = |idx: usize, kind: PartKind| Part {
4174 session_id: session.id.clone(),
4175 message_id: format!("msg-{idx}"),
4176 id: format!("part-{idx}"),
4177 ordinal: 0,
4178 provenance: Provenance::Conversational,
4179 options: ProviderOptions::new(),
4180 kind,
4181 };
4182
4183 let batch_a = vec![
4184 make_part(
4185 0,
4186 PartKind::File {
4187 media_type: Some("text/plain".to_owned()),
4188 file_name: Some("a.txt".to_owned()),
4189 data: FileData::Bytes(b"alpha".to_vec()),
4190 },
4191 ),
4192 make_part(
4193 1,
4194 PartKind::File {
4195 media_type: Some("text/plain".to_owned()),
4196 file_name: Some("b.txt".to_owned()),
4197 data: FileData::String("beta".to_owned()),
4198 },
4199 ),
4200 ];
4201 store.upsert_parts(&batch_a).await?;
4202
4203 let batch_b = vec![
4204 make_part(
4205 2,
4206 PartKind::File {
4207 media_type: Some("application/octet-stream".to_owned()),
4208 file_name: None,
4209 data: FileData::Url("https://example.com/file".to_owned()),
4210 },
4211 ),
4212 make_part(
4213 3,
4214 PartKind::File {
4215 media_type: Some("image/png".to_owned()),
4216 file_name: Some("c.png".to_owned()),
4217 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4218 },
4219 ),
4220 ];
4221 store.upsert_parts(&batch_b).await?;
4222
4223 store
4224 .optimize_indices(None, &MaintenancePolicy::always_compact())
4225 .await?
4226 .into_result()?;
4227
4228 Ok(())
4229 }
4230
4231 #[tokio::test]
4232 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4233 let temp = TempDir::new()?;
4234 let store = Store::open_local(temp.path()).await?;
4235 let session = synthetic_session("blob");
4236 let message = Message::User {
4237 id: "message-1".to_owned(),
4238 session_id: session.id.clone(),
4239 timestamp: Utc::now(),
4240 options: ProviderOptions::new(),
4241 };
4242 let part = Part {
4243 session_id: session.id.clone(),
4244 id: "part-1".to_owned(),
4245 message_id: message.id().to_owned(),
4246 ordinal: 0,
4247 provenance: crate::wire::Provenance::Conversational,
4248 options: ProviderOptions::new(),
4249 kind: PartKind::File {
4250 media_type: Some("text/plain".to_owned()),
4251 file_name: Some("payload.txt".to_owned()),
4252 data: FileData::Bytes(b"pond".to_vec()),
4253 },
4254 };
4255
4256 let mut validator = IngestValidator::default();
4257 validator
4258 .push(&store, 0, IngestEvent::Session(session.clone()))
4259 .await?;
4260 validator
4261 .push(&store, 1, IngestEvent::Message(message.clone()))
4262 .await?;
4263 validator
4264 .push(&store, 2, IngestEvent::Part(part.clone()))
4265 .await?;
4266 validator.finish(&store).await?;
4267
4268 let stored = store
4269 .get_session(&session.id)
4270 .await?
4271 .expect("session should exist");
4272 let stored_part = &stored.messages[0].parts[0];
4273 assert_eq!(stored_part, &part);
4274
4275 Ok(())
4276 }
4277
4278 fn base_session() -> Session {
4289 Session {
4290 id: "01HXY00000000001".to_owned(),
4291 parent_session_id: None,
4292 parent_message_id: None,
4293 source_agent: "claude-code".to_owned(),
4294 created_at: Utc::now(),
4295 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4296 options: ProviderOptions::new(),
4297 }
4298 }
4299
4300 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4301 outcomes
4302 .iter()
4303 .filter(|outcome| outcome.status == target)
4304 .count()
4305 }
4306
4307 #[tokio::test(flavor = "multi_thread")]
4308 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4309 -> anyhow::Result<()> {
4310 let temp = TempDir::new()?;
4311 let store = Store::open_local(temp.path()).await?;
4312
4313 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4314 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4315
4316 let mut again = base_session();
4317 again.options.insert("title".to_owned(), json!("renamed"));
4318 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4319 assert_eq!(
4320 count_status(&second, OutcomeStatus::Error),
4321 0,
4322 "options is mutable; the re-ingest must not surface an error: {second:?}",
4323 );
4324 assert_eq!(
4325 count_status(&second, OutcomeStatus::Matched),
4326 1,
4327 "unchanged immutable fields must match-insert via merge_insert",
4328 );
4329
4330 Ok(())
4331 }
4332
4333 #[tokio::test(flavor = "multi_thread")]
4334 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4335 let temp = TempDir::new()?;
4336 let store = Store::open_local(temp.path()).await?;
4337
4338 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4339 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4340
4341 let mut tampered = base_session();
4342 tampered.source_agent = "codex-cli".to_owned();
4343 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4344 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4345 let err_row = second
4346 .iter()
4347 .find(|outcome| outcome.status == OutcomeStatus::Error)
4348 .expect("error outcome present");
4349 let err = err_row.error.as_ref().expect("error body present");
4350 assert_eq!(err.field, Some("source_agent"));
4351 assert_eq!(err.reason, Some("immutable"));
4352
4353 let stored = store
4355 .get_session(&base_session().id)
4356 .await?
4357 .expect("session row survives the rejected re-ingest");
4358 assert_eq!(stored.session.source_agent, "claude-code");
4359
4360 Ok(())
4361 }
4362
4363 #[tokio::test(flavor = "multi_thread")]
4364 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4365 let temp = TempDir::new()?;
4366 let store = Store::open_local(temp.path()).await?;
4367
4368 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4369 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4370
4371 let mut tampered = base_session();
4372 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4373 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4374 let err_row = second
4375 .iter()
4376 .find(|outcome| outcome.status == OutcomeStatus::Error)
4377 .expect("project change must surface an error outcome");
4378 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4379
4380 let stored = store
4381 .get_session(&base_session().id)
4382 .await?
4383 .expect("session row survives");
4384 assert_eq!(
4385 stored.session.project.as_str(),
4386 "/home/me/proj",
4387 "stored project must remain the original",
4388 );
4389
4390 Ok(())
4391 }
4392
4393 #[tokio::test(flavor = "multi_thread")]
4394 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4395 use crate::wire::Provenance;
4403 let temp = TempDir::new()?;
4404 let store = Store::open_local(temp.path()).await?;
4405 let session = base_session();
4406
4407 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4408 session_id: session.id.clone(),
4409 id: part_id.to_owned(),
4410 message_id: message_id.to_owned(),
4411 ordinal: 0,
4412 provenance: Provenance::Conversational,
4413 options: ProviderOptions::new(),
4414 kind: PartKind::Text {
4415 text: Some(Extracted::from_test_value(body.to_owned())),
4416 },
4417 };
4418 let user_message = |id: &str| Message::User {
4419 id: id.to_owned(),
4420 session_id: session.id.clone(),
4421 timestamp: Utc::now(),
4422 options: ProviderOptions::new(),
4423 };
4424
4425 let mut validator = IngestValidator::default();
4427 validator
4428 .push(&store, 0, IngestEvent::Session(session.clone()))
4429 .await?;
4430 validator
4431 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4432 .await?;
4433 validator
4434 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4435 .await?;
4436 validator
4437 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4438 .await?;
4439 validator
4440 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4441 .await?;
4442 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4443 assert_eq!(first_counts.sessions_inserted, 1);
4444 assert_eq!(first_counts.messages_inserted_total, 2);
4445 assert_eq!(first_counts.messages_inserted_searchable, 2);
4446
4447 let mut validator = IngestValidator::default();
4449 validator
4450 .push(&store, 0, IngestEvent::Session(session.clone()))
4451 .await?;
4452 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4453 let pid = format!("p{}", idx + 3);
4454 validator
4455 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4456 .await?;
4457 validator
4458 .push(
4459 &store,
4460 idx * 2 + 2,
4461 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4462 )
4463 .await?;
4464 }
4465 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4466
4467 assert_eq!(
4468 second_counts.sessions_inserted, 0,
4469 "existing session row must report as Matched, not Inserted",
4470 );
4471 assert_eq!(second_counts.sessions_matched, 1);
4472 assert_eq!(
4473 second_counts.messages_inserted_total, 3,
4474 "the three NEW messages must register as Inserted in BatchCounts",
4475 );
4476 assert_eq!(
4477 second_counts.messages_inserted_searchable, 3,
4478 "all three new messages carry conversational text -> searchable",
4479 );
4480 assert_eq!(second_counts.messages_matched_total, 0);
4481 assert_eq!(second_counts.parts_inserted, 3);
4482 assert_eq!(second_counts.parts_matched, 0);
4483
4484 let session_outcome = second_outcomes
4487 .iter()
4488 .find(|outcome| outcome.kind == "session")
4489 .expect("session-row outcome present");
4490 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4491 for outcome in &second_outcomes {
4492 if outcome.kind == "message" || outcome.kind == "part" {
4493 assert_eq!(
4494 outcome.status,
4495 OutcomeStatus::Inserted,
4496 "new row must be Inserted, got: {outcome:?}",
4497 );
4498 }
4499 }
4500 Ok(())
4501 }
4502
4503 async fn store_with_messages(
4507 temp: &TempDir,
4508 count: usize,
4509 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4510 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4511 }
4512
4513 async fn store_with_messages_at_threshold(
4516 temp: &TempDir,
4517 count: usize,
4518 _vector_threshold: usize,
4519 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4520 let store = Store::open_local(temp.path()).await?;
4521 let sessions = 8.min(count.max(1));
4522 let mut events = Vec::new();
4523 for s in 0..sessions {
4524 events.push(IngestEvent::Session(Session {
4525 id: format!("session-{s}"),
4526 parent_session_id: None,
4527 parent_message_id: None,
4528 source_agent: "claude-code".to_owned(),
4529 created_at: Utc::now(),
4530 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4531 options: ProviderOptions::new(),
4532 }));
4533 for i in (s..count).step_by(sessions) {
4534 let message_id = format!("msg-{i}");
4535 events.push(IngestEvent::Message(Message::User {
4536 id: message_id.clone(),
4537 session_id: format!("session-{s}"),
4538 timestamp: Utc::now(),
4539 options: ProviderOptions::new(),
4540 }));
4541 events.push(IngestEvent::Part(Part {
4542 session_id: format!("session-{s}"),
4543 id: format!("{message_id}-part"),
4544 message_id,
4545 ordinal: 0,
4546 provenance: crate::wire::Provenance::Conversational,
4547 options: ProviderOptions::new(),
4548 kind: PartKind::Text {
4549 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4550 },
4551 }));
4552 }
4553 }
4554 ingest_events(&store, events).await?;
4555 let keys = (0..count)
4556 .map(|i| MessageKey {
4557 session_id: format!("session-{}", i % sessions),
4558 message_id: format!("msg-{i}"),
4559 })
4560 .collect();
4561 Ok((store, keys))
4562 }
4563
4564 fn synthetic_vector(seed: usize) -> Vec<f32> {
4566 let mut state = (seed as u64)
4567 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4568 .wrapping_add(1);
4569 (0..embedding_dim())
4570 .map(|_| {
4571 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4572 #[allow(clippy::cast_precision_loss)]
4573 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4574 unit - 1.0
4575 })
4576 .collect()
4577 }
4578
4579 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4581 keys.iter()
4582 .enumerate()
4583 .map(|(seed, key)| EmbeddedMessage {
4584 session_id: key.session_id.clone(),
4585 id: key.message_id.clone(),
4586 vector: synthetic_vector(seed),
4587 })
4588 .collect()
4589 }
4590
4591 fn embedding_update_batch_with_model(
4592 rows: &[EmbeddedMessage],
4593 model: &str,
4594 ) -> Result<RecordBatch> {
4595 let mut batch = embedding_update_batch(rows)?;
4596 let columns = batch
4597 .columns()
4598 .iter()
4599 .take(3)
4600 .cloned()
4601 .chain(std::iter::once(
4602 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4603 ))
4604 .collect::<Vec<_>>();
4605 batch = RecordBatch::try_new(batch.schema(), columns)?;
4606 Ok(batch)
4607 }
4608
4609 #[tokio::test]
4610 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4611 let temp = TempDir::new()?;
4612 let (store, keys) = store_with_messages(&temp, 4).await?;
4616 store.write_embeddings(&embedded(&keys)).await?;
4617 store
4618 .optimize_indices(None, &MaintenancePolicy::always_compact())
4619 .await?
4620 .into_result()?;
4621
4622 let query = vec![0.01_f32; embedding_dim()];
4623 let plan = store
4624 .explain_vector_plan(
4625 &query,
4626 10,
4627 &Predicate::Eq("session_id", "session-3".into()),
4628 None,
4629 )
4630 .await?;
4631
4632 assert!(
4637 plan.contains("ScalarIndexQuery"),
4638 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4639 );
4640 let predicate_postfiltered = plan
4641 .lines()
4642 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4643 assert!(
4644 !predicate_postfiltered,
4645 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4646 );
4647 Ok(())
4648 }
4649
4650 #[tokio::test]
4651 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4652 let temp = TempDir::new()?;
4653 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4654
4655 store.write_embeddings(&embedded(&keys[..255])).await?;
4658 store
4659 .optimize_indices_with_vector_threshold(256)
4660 .await?
4661 .into_result()?;
4662 assert!(
4663 !store
4664 .handle
4665 .messages_index_names()
4666 .await?
4667 .iter()
4668 .any(|name| name == MESSAGES_VECTOR_INDEX),
4669 "IVF_PQ must not exist below the activation threshold",
4670 );
4671
4672 store.write_embeddings(&embedded(&keys[255..256])).await?;
4675 store
4676 .optimize_indices_with_vector_threshold(256)
4677 .await?
4678 .into_result()?;
4679 assert!(
4680 store
4681 .handle
4682 .messages_index_names()
4683 .await?
4684 .iter()
4685 .any(|name| name == MESSAGES_VECTOR_INDEX),
4686 "optimize must create the IVF_PQ once the threshold is crossed",
4687 );
4688
4689 let hits = store
4692 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4693 .await?;
4694 assert!(
4695 hits.iter().any(|(key, _)| key == &keys[0]),
4696 "an embedded row is retrievable via the index",
4697 );
4698 Ok(())
4699 }
4700
4701 #[tokio::test]
4702 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4703 {
4704 let temp = TempDir::new()?;
4705 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4706 let old_rows = embedded(&keys);
4707 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4708 store
4709 .handle
4710 .merge_update(Table::Messages, old_batch, old_rows.len())
4711 .await?;
4712 store
4713 .optimize_indices_with_vector_threshold(256)
4714 .await?
4715 .into_result()?;
4716 assert!(
4717 store
4718 .handle
4719 .messages_index_names()
4720 .await?
4721 .iter()
4722 .any(|name| name == MESSAGES_VECTOR_INDEX),
4723 "IVF_PQ must exist before a model swap",
4724 );
4725 assert_eq!(store.stale_embedding_count().await?, keys.len());
4726
4727 store.drop_vector_index().await?;
4728 let mut pending = Vec::new();
4729 let stream = store.pending_or_stale_messages();
4730 tokio::pin!(stream);
4731 while let Some(row) = stream.next().await {
4732 pending.push(row?);
4733 }
4734 assert_eq!(
4735 pending.len(),
4736 keys.len(),
4737 "force stream should see stale rows"
4738 );
4739 store.write_embeddings(&embedded(&keys)).await?;
4740 assert_eq!(store.stale_embedding_count().await?, 0);
4741 store
4742 .optimize_indices_with_vector_threshold(256)
4743 .await?
4744 .into_result()?;
4745 assert!(
4746 store
4747 .handle
4748 .messages_index_names()
4749 .await?
4750 .iter()
4751 .any(|name| name == MESSAGES_VECTOR_INDEX),
4752 "optimize must rebuild IVF_PQ after force re-embed",
4753 );
4754
4755 let stream = store.pending_or_stale_messages();
4756 tokio::pin!(stream);
4757 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4758 Ok(())
4759 }
4760
4761 #[tokio::test]
4762 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4763 let temp = TempDir::new()?;
4772 let (store, _keys) = store_with_messages(&temp, 4).await?;
4773
4774 for tag in 0..3 {
4777 let extra = synthetic_session(&format!("extra-{tag}"));
4778 store.upsert_sessions(&[extra]).await?;
4779 }
4780
4781 let dataset = store.handle.dataset(Table::Sessions).await?;
4786 dataset
4787 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4788 .await
4789 .context("cleanup_old_versions failed")?;
4790
4791 let map = store.session_last_ingested_at().await?;
4792 let session_count = store.row_counts().await?.0;
4793 assert!(
4794 map.len() >= session_count,
4795 "watermark map ({}) must still cover every session ({}) after \
4796 version cleanup; an empty fallback regresses pond sync to a \
4797 full re-scan",
4798 map.len(),
4799 session_count,
4800 );
4801 Ok(())
4802 }
4803
4804 #[tokio::test]
4805 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4806 let temp = TempDir::new()?;
4807 let (store, keys) = store_with_messages(&temp, 10).await?;
4808
4809 let before = store.embedding_progress().await?;
4810 assert_eq!(before.embedded, 0);
4811 assert_eq!(before.total, 10);
4812 assert_eq!(before.model, crate::embed::model_id());
4813
4814 store.write_embeddings(&embedded(&keys[..4])).await?;
4815 let partial = store.embedding_progress().await?;
4816 assert_eq!(partial.embedded, 4);
4817 assert_eq!(partial.total, 10);
4818
4819 store.write_embeddings(&embedded(&keys[4..])).await?;
4820 let full = store.embedding_progress().await?;
4821 assert_eq!(full.embedded, 10);
4822 assert_eq!(full.total, 10);
4823 Ok(())
4824 }
4825}