1use std::path::PathBuf;
35use std::str::FromStr;
36use std::sync::Arc;
37
38use rusqlite::{Connection, OptionalExtension, TransactionBehavior, params, params_from_iter};
39use solo_core::{
40 Embedder, Embedding, Episode, Error, InvalidateEvent, MemoryId, Result, Tier, VectorIndex,
41};
42use tokio::runtime::Handle;
43use tokio::sync::{RwLock as AsyncRwLock, broadcast, mpsc, oneshot};
44
45use crate::audit::{AuditEvent, AuditOperation, AuditResult, insert_audit_row_in_tx};
46use crate::backup::backup_from_connection;
47use crate::hnsw_id::{chunk_hnsw_id, episode_hnsw_id};
48use crate::key_material::KeyMaterial;
49
50pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
54
55pub const INVALIDATE_BROADCAST_CAPACITY: usize = 256;
69
70pub const MAX_REMEMBER_BATCH_SIZE: usize = 200;
79
80#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
85pub struct ConsolidationScope {
86 pub window_days: Option<i64>,
91 #[serde(default)]
99 pub force_merge: bool,
100}
101
102#[derive(Debug, Clone, Default, serde::Serialize)]
108pub struct ConsolidationReport {
109 pub episodes_seen: usize,
111 pub clusters_built: usize,
114 pub episodes_clustered: usize,
117 pub clusters_merged: usize,
124 pub clusters_absorbed: usize,
134 pub existing_clusters_merged: usize,
148 pub abstractions_regenerated: usize,
158 pub abstractions_built: usize,
162 pub triples_built: usize,
166 pub contradictions_found: usize,
168}
169
170#[derive(Debug, Clone, Default)]
172pub struct ReembedScope {
173 pub from: Option<(String, String)>,
178 pub dry_run: bool,
180 pub gc: bool,
185}
186
187#[derive(Debug, Clone, Default)]
188pub struct ReembedReport {
189 pub rows_seen: usize,
191 pub rows_reembedded: usize,
193 pub rows_failed: usize,
195 pub rows_gc_deleted: usize,
197 pub dry_run: bool,
200}
201
202#[derive(Debug, Clone, Default, serde::Serialize)]
212pub struct NormalizeReport {
213 pub aliases_processed: usize,
215 pub subject_rows_updated: usize,
219 pub object_rows_updated: usize,
222 pub dry_run: bool,
225}
226
227pub const DEFAULT_INGEST_MAX_BYTES: u64 = 50 * 1024 * 1024;
236pub(crate) const SOLO_INGEST_MAX_BYTES_ENV: &str = "SOLO_INGEST_MAX_BYTES";
237
238pub fn resolve_ingest_max_bytes() -> Option<u64> {
255 match std::env::var(SOLO_INGEST_MAX_BYTES_ENV) {
256 Err(_) => Some(DEFAULT_INGEST_MAX_BYTES),
257 Ok(raw) => {
258 let trimmed = raw.trim();
259 match trimmed.parse::<u64>() {
260 Ok(0) => None,
261 Ok(n) => Some(n),
262 Err(_) => {
263 tracing::warn!(
264 value = %raw,
265 env = SOLO_INGEST_MAX_BYTES_ENV,
266 default_bytes = DEFAULT_INGEST_MAX_BYTES,
267 "unparseable SOLO_INGEST_MAX_BYTES; falling back to default"
268 );
269 Some(DEFAULT_INGEST_MAX_BYTES)
270 }
271 }
272 }
273 }
274}
275
276#[derive(Debug, Clone, serde::Serialize)]
289pub struct IngestReport {
290 pub doc_id: solo_core::DocumentId,
291 pub chunks_persisted: u32,
292 pub bytes_ingested: u64,
293 pub deduped: bool,
294}
295
296#[derive(Debug, Clone, serde::Serialize)]
306pub struct ForgetDocumentReport {
307 pub doc_id: solo_core::DocumentId,
308 pub chunks_tombstoned: u32,
309}
310
311#[derive(Debug, Clone, serde::Serialize)]
314pub struct MemoryUpdateReport {
315 pub memory_id: MemoryId,
316 pub rowid: i64,
317 pub content: String,
318 pub updated_at_ms: i64,
319}
320
321#[derive(Debug)]
330pub enum WriteCommand {
331 Remember {
332 episode: Episode,
333 embedding: Embedding,
334 audit_principal: Option<String>,
335 reply: oneshot::Sender<Result<MemoryId>>,
336 },
337 RememberBatch {
356 items: Vec<(Episode, Embedding)>,
357 audit_principal: Option<String>,
358 reply: oneshot::Sender<Result<Vec<MemoryId>>>,
359 },
360 Forget {
361 memory_id: MemoryId,
362 reason: String,
363 audit_principal: Option<String>,
364 reply: oneshot::Sender<Result<()>>,
365 },
366 Update {
367 memory_id: MemoryId,
368 content: String,
369 embedding: Embedding,
370 audit_principal: Option<String>,
371 reply: oneshot::Sender<Result<MemoryUpdateReport>>,
372 },
373 IngestDocument {
385 path: std::path::PathBuf,
386 chunk_config: crate::document::ChunkConfig,
387 audit_principal: Option<String>,
388 reply: oneshot::Sender<Result<IngestReport>>,
389 },
390 ForgetDocument {
396 doc_id: solo_core::DocumentId,
397 audit_principal: Option<String>,
398 reply: oneshot::Sender<Result<ForgetDocumentReport>>,
399 },
400 Consolidate {
401 scope: ConsolidationScope,
402 audit_principal: Option<String>,
403 reply: oneshot::Sender<Result<ConsolidationReport>>,
404 },
405 Reembed {
406 scope: ReembedScope,
407 audit_principal: Option<String>,
408 reply: oneshot::Sender<Result<ReembedReport>>,
409 },
410 SaveSnapshot {
411 reply: oneshot::Sender<Result<()>>,
412 },
413 Backup {
422 dest_path: PathBuf,
423 reply: oneshot::Sender<Result<()>>,
424 },
425 NormalizeSubjects {
438 aliases: Vec<(String, String)>,
443 dry_run: bool,
448 audit_principal: Option<String>,
449 reply: oneshot::Sender<Result<NormalizeReport>>,
450 },
451 EmitLlmSamplingAudit {
471 event: AuditEvent,
472 reply: oneshot::Sender<Result<()>>,
473 },
474 AttachAbstractionBatch {
503 items: Vec<(MemoryId, solo_core::SemanticAbstraction)>,
508 episode_count: usize,
511 duration_ms: u64,
515 clusters_deferred: usize,
524 audit_principal: Option<String>,
528 reply: oneshot::Sender<Result<AttachAbstractionBatchReport>>,
529 },
530 ResolveContradiction {
539 a_id: String,
540 b_id: String,
541 kind: String,
542 status: String,
543 resolution_note: Option<String>,
544 winning_triple_id: Option<String>,
545 audit_principal: Option<String>,
546 reply: oneshot::Sender<Result<ResolveContradictionReport>>,
547 },
548}
549
550#[derive(Debug, Clone, PartialEq, Eq)]
556pub struct ResolveContradictionReport {
557 pub a_id: String,
558 pub b_id: String,
559 pub kind: String,
560 pub status: String,
561 pub resolved_at_ms: Option<i64>,
562 pub resolution_note: Option<String>,
563 pub winning_triple_id: Option<String>,
564}
565
566#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
571pub struct AttachAbstractionBatchReport {
572 pub abstractions_built: usize,
573 pub triples_extracted: usize,
574 pub clusters_failed: usize,
575 pub clusters_deferred: usize,
582}
583
584#[derive(Clone, Debug)]
587pub struct WriteHandle {
588 tx: mpsc::Sender<WriteCommand>,
589}
590
591impl WriteHandle {
592 pub async fn remember(
593 &self,
594 episode: Episode,
595 embedding: Embedding,
596 ) -> Result<MemoryId> {
597 self.remember_as(None, episode, embedding).await
598 }
599
600 pub async fn remember_as(
605 &self,
606 audit_principal: Option<String>,
607 episode: Episode,
608 embedding: Embedding,
609 ) -> Result<MemoryId> {
610 let (reply_tx, reply_rx) = oneshot::channel();
611 self.tx
612 .send(WriteCommand::Remember {
613 episode,
614 embedding,
615 audit_principal,
616 reply: reply_tx,
617 })
618 .await
619 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
620 reply_rx
621 .await
622 .map_err(|_| Error::storage("writer dropped reply channel"))?
623 }
624
625 pub async fn remember_batch_as(
628 &self,
629 audit_principal: Option<String>,
630 items: Vec<(Episode, Embedding)>,
631 ) -> Result<Vec<MemoryId>> {
632 let (reply_tx, reply_rx) = oneshot::channel();
633 self.tx
634 .send(WriteCommand::RememberBatch {
635 items,
636 audit_principal,
637 reply: reply_tx,
638 })
639 .await
640 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
641 reply_rx
642 .await
643 .map_err(|_| Error::storage("writer dropped reply channel"))?
644 }
645
646 pub async fn forget(&self, memory_id: MemoryId, reason: String) -> Result<()> {
647 self.forget_as(None, memory_id, reason).await
648 }
649
650 pub async fn forget_as(
652 &self,
653 audit_principal: Option<String>,
654 memory_id: MemoryId,
655 reason: String,
656 ) -> Result<()> {
657 let (reply_tx, reply_rx) = oneshot::channel();
658 self.tx
659 .send(WriteCommand::Forget {
660 memory_id,
661 reason,
662 audit_principal,
663 reply: reply_tx,
664 })
665 .await
666 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
667 reply_rx
668 .await
669 .map_err(|_| Error::storage("writer dropped reply channel"))?
670 }
671
672 pub async fn update(
673 &self,
674 memory_id: MemoryId,
675 content: String,
676 embedding: Embedding,
677 ) -> Result<MemoryUpdateReport> {
678 self.update_as(None, memory_id, content, embedding).await
679 }
680
681 pub async fn update_as(
684 &self,
685 audit_principal: Option<String>,
686 memory_id: MemoryId,
687 content: String,
688 embedding: Embedding,
689 ) -> Result<MemoryUpdateReport> {
690 let (reply_tx, reply_rx) = oneshot::channel();
691 self.tx
692 .send(WriteCommand::Update {
693 memory_id,
694 content,
695 embedding,
696 audit_principal,
697 reply: reply_tx,
698 })
699 .await
700 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
701 reply_rx
702 .await
703 .map_err(|_| Error::storage("writer dropped reply channel"))?
704 }
705
706 pub async fn ingest_document(
709 &self,
710 path: std::path::PathBuf,
711 chunk_config: crate::document::ChunkConfig,
712 ) -> Result<IngestReport> {
713 self.ingest_document_as(None, path, chunk_config).await
714 }
715
716 pub async fn ingest_document_as(
718 &self,
719 audit_principal: Option<String>,
720 path: std::path::PathBuf,
721 chunk_config: crate::document::ChunkConfig,
722 ) -> Result<IngestReport> {
723 let (reply_tx, reply_rx) = oneshot::channel();
724 self.tx
725 .send(WriteCommand::IngestDocument {
726 path,
727 chunk_config,
728 audit_principal,
729 reply: reply_tx,
730 })
731 .await
732 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
733 reply_rx
734 .await
735 .map_err(|_| Error::storage("writer dropped reply channel"))?
736 }
737
738 pub async fn forget_document(
741 &self,
742 doc_id: solo_core::DocumentId,
743 ) -> Result<ForgetDocumentReport> {
744 self.forget_document_as(None, doc_id).await
745 }
746
747 pub async fn forget_document_as(
749 &self,
750 audit_principal: Option<String>,
751 doc_id: solo_core::DocumentId,
752 ) -> Result<ForgetDocumentReport> {
753 let (reply_tx, reply_rx) = oneshot::channel();
754 self.tx
755 .send(WriteCommand::ForgetDocument {
756 doc_id,
757 audit_principal,
758 reply: reply_tx,
759 })
760 .await
761 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
762 reply_rx
763 .await
764 .map_err(|_| Error::storage("writer dropped reply channel"))?
765 }
766
767 pub async fn consolidate(&self, scope: ConsolidationScope) -> Result<ConsolidationReport> {
768 self.consolidate_as(None, scope).await
769 }
770
771 pub async fn consolidate_as(
773 &self,
774 audit_principal: Option<String>,
775 scope: ConsolidationScope,
776 ) -> Result<ConsolidationReport> {
777 let (reply_tx, reply_rx) = oneshot::channel();
778 self.tx
779 .send(WriteCommand::Consolidate {
780 scope,
781 audit_principal,
782 reply: reply_tx,
783 })
784 .await
785 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
786 reply_rx
787 .await
788 .map_err(|_| Error::storage("writer dropped reply channel"))?
789 }
790
791 pub async fn reembed(&self, scope: ReembedScope) -> Result<ReembedReport> {
792 self.reembed_as(None, scope).await
793 }
794
795 pub async fn reembed_as(
797 &self,
798 audit_principal: Option<String>,
799 scope: ReembedScope,
800 ) -> Result<ReembedReport> {
801 let (reply_tx, reply_rx) = oneshot::channel();
802 self.tx
803 .send(WriteCommand::Reembed {
804 scope,
805 audit_principal,
806 reply: reply_tx,
807 })
808 .await
809 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
810 reply_rx
811 .await
812 .map_err(|_| Error::storage("writer dropped reply channel"))?
813 }
814
815 pub async fn backup(&self, dest_path: PathBuf) -> Result<()> {
819 let (reply_tx, reply_rx) = oneshot::channel();
820 self.tx
821 .send(WriteCommand::Backup {
822 dest_path,
823 reply: reply_tx,
824 })
825 .await
826 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
827 reply_rx
828 .await
829 .map_err(|_| Error::storage("writer dropped reply channel"))?
830 }
831
832 pub async fn save_snapshot(&self) -> Result<()> {
833 let (reply_tx, reply_rx) = oneshot::channel();
834 self.tx
835 .send(WriteCommand::SaveSnapshot { reply: reply_tx })
836 .await
837 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
838 reply_rx
839 .await
840 .map_err(|_| Error::storage("writer dropped reply channel"))?
841 }
842
843 pub async fn normalize_subjects(
847 &self,
848 aliases: Vec<(String, String)>,
849 dry_run: bool,
850 ) -> Result<NormalizeReport> {
851 self.normalize_subjects_as(None, aliases, dry_run).await
852 }
853
854 pub async fn normalize_subjects_as(
856 &self,
857 audit_principal: Option<String>,
858 aliases: Vec<(String, String)>,
859 dry_run: bool,
860 ) -> Result<NormalizeReport> {
861 let (reply_tx, reply_rx) = oneshot::channel();
862 self.tx
863 .send(WriteCommand::NormalizeSubjects {
864 aliases,
865 dry_run,
866 audit_principal,
867 reply: reply_tx,
868 })
869 .await
870 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
871 reply_rx
872 .await
873 .map_err(|_| Error::storage("writer dropped reply channel"))?
874 }
875
876 pub async fn emit_llm_sampling_audit(
893 &self,
894 event: AuditEvent,
895 ) -> Result<()> {
896 let (reply_tx, reply_rx) = oneshot::channel();
897 self.tx
898 .send(WriteCommand::EmitLlmSamplingAudit {
899 event,
900 reply: reply_tx,
901 })
902 .await
903 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
904 reply_rx
905 .await
906 .map_err(|_| Error::storage("writer dropped reply channel"))?
907 }
908
909 pub async fn attach_abstraction_batch(
920 &self,
921 items: Vec<(MemoryId, solo_core::SemanticAbstraction)>,
922 episode_count: usize,
923 duration_ms: u64,
924 clusters_deferred: usize,
925 audit_principal: Option<String>,
926 ) -> Result<AttachAbstractionBatchReport> {
927 let (reply_tx, reply_rx) = oneshot::channel();
928 self.tx
929 .send(WriteCommand::AttachAbstractionBatch {
930 items,
931 episode_count,
932 duration_ms,
933 clusters_deferred,
934 audit_principal,
935 reply: reply_tx,
936 })
937 .await
938 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
939 reply_rx
940 .await
941 .map_err(|_| Error::storage("writer dropped reply channel"))?
942 }
943
944 #[allow(clippy::too_many_arguments)]
951 pub async fn resolve_contradiction_as(
952 &self,
953 audit_principal: Option<String>,
954 a_id: String,
955 b_id: String,
956 kind: String,
957 status: String,
958 resolution_note: Option<String>,
959 winning_triple_id: Option<String>,
960 ) -> Result<ResolveContradictionReport> {
961 let (reply_tx, reply_rx) = oneshot::channel();
962 self.tx
963 .send(WriteCommand::ResolveContradiction {
964 a_id,
965 b_id,
966 kind,
967 status,
968 resolution_note,
969 winning_triple_id,
970 audit_principal,
971 reply: reply_tx,
972 })
973 .await
974 .map_err(|_| Error::storage("writer task gone (channel closed)"))?;
975 reply_rx
976 .await
977 .map_err(|_| Error::storage("writer dropped reply channel"))?
978 }
979}
980
981pub struct WriterActor {
983 conn: Connection,
984 hnsw: Arc<dyn VectorIndex + Send + Sync>,
985 rx: mpsc::Receiver<WriteCommand>,
986 snapshot_dir: Option<PathBuf>,
990 embedder_id: Option<i64>,
998 embedder: Option<Arc<dyn Embedder>>,
1005 runtime_handle: Option<Handle>,
1010 steward: Option<Arc<solo_steward::Steward>>,
1023 steward_slot:
1048 Option<Arc<AsyncRwLock<Option<Arc<solo_steward::Steward>>>>>,
1049 key: Option<KeyMaterial>,
1055 redactor: Arc<crate::redaction::RedactionRegistry>,
1063 quota_bytes: Option<u64>,
1071 db_path: Option<PathBuf>,
1075 triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
1085 invalidate_tx: Option<broadcast::Sender<InvalidateEvent>>,
1096 invalidate_tenant_id: Option<String>,
1100}
1101
1102pub struct WriterSpawn {
1112 pub handle: WriteHandle,
1113 pub join: std::thread::JoinHandle<()>,
1114}
1115
1116impl WriterSpawn {
1117 pub fn shutdown_blocking(self) {
1121 drop(self.handle);
1122 if let Err(panic) = self.join.join() {
1123 tracing::error!(?panic, "solo-writer thread panicked during shutdown");
1124 }
1125 }
1126}
1127
1128impl WriterActor {
1129 pub fn spawn(
1130 conn: Connection,
1131 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1132 ) -> WriterSpawn {
1133 Self::spawn_with_capacity(conn, hnsw, DEFAULT_CHANNEL_CAPACITY)
1134 }
1135
1136 pub fn spawn_with_capacity(
1137 conn: Connection,
1138 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1139 capacity: usize,
1140 ) -> WriterSpawn {
1141 Self::spawn_internal(conn, hnsw, capacity, None, None, None, None, None, None, None)
1142 }
1143
1144 pub fn spawn_with_snapshot_dir(
1147 conn: Connection,
1148 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1149 snapshot_dir: PathBuf,
1150 ) -> WriterSpawn {
1151 Self::spawn_internal(
1152 conn,
1153 hnsw,
1154 DEFAULT_CHANNEL_CAPACITY,
1155 Some(snapshot_dir),
1156 None,
1157 None,
1158 None,
1159 None,
1160 None,
1161 None,
1162 )
1163 }
1164
1165 pub fn spawn_full(
1169 conn: Connection,
1170 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1171 snapshot_dir: PathBuf,
1172 embedder_id: i64,
1173 ) -> WriterSpawn {
1174 Self::spawn_internal(
1175 conn,
1176 hnsw,
1177 DEFAULT_CHANNEL_CAPACITY,
1178 Some(snapshot_dir),
1179 Some(embedder_id),
1180 None,
1181 None,
1182 None,
1183 None,
1184 None,
1185 )
1186 }
1187
1188 pub fn spawn_full_with_embedder(
1202 conn: Connection,
1203 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1204 snapshot_dir: PathBuf,
1205 embedder_id: i64,
1206 embedder: Arc<dyn Embedder>,
1207 ) -> WriterSpawn {
1208 Self::spawn_full_with_embedder_and_optional_steward(
1209 conn,
1210 hnsw,
1211 snapshot_dir,
1212 embedder_id,
1213 embedder,
1214 None,
1215 )
1216 }
1217
1218 pub fn spawn_full_with_embedder_and_optional_steward(
1225 conn: Connection,
1226 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1227 snapshot_dir: PathBuf,
1228 embedder_id: i64,
1229 embedder: Arc<dyn Embedder>,
1230 steward: Option<Arc<solo_steward::Steward>>,
1231 ) -> WriterSpawn {
1232 let handle = Handle::current();
1233 Self::spawn_internal(
1234 conn,
1235 hnsw,
1236 DEFAULT_CHANNEL_CAPACITY,
1237 Some(snapshot_dir),
1238 Some(embedder_id),
1239 Some(embedder),
1240 Some(handle),
1241 steward,
1242 None,
1243 None,
1244 )
1245 }
1246
1247 pub fn spawn_full_with_key_and_optional_steward(
1252 conn: Connection,
1253 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1254 snapshot_dir: PathBuf,
1255 embedder_id: i64,
1256 embedder: Arc<dyn Embedder>,
1257 steward: Option<Arc<solo_steward::Steward>>,
1258 key: KeyMaterial,
1259 ) -> WriterSpawn {
1260 let handle = Handle::current();
1261 Self::spawn_internal(
1262 conn,
1263 hnsw,
1264 DEFAULT_CHANNEL_CAPACITY,
1265 Some(snapshot_dir),
1266 Some(embedder_id),
1267 Some(embedder),
1268 Some(handle),
1269 steward,
1270 Some(key),
1271 None,
1272 )
1273 }
1274
1275 pub fn spawn_full_with_key_steward_and_runtime(
1282 conn: Connection,
1283 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1284 snapshot_dir: PathBuf,
1285 embedder_id: i64,
1286 embedder: Arc<dyn Embedder>,
1287 steward: Option<Arc<solo_steward::Steward>>,
1288 key: KeyMaterial,
1289 runtime_handle: Handle,
1290 ) -> WriterSpawn {
1291 Self::spawn_internal(
1292 conn,
1293 hnsw,
1294 DEFAULT_CHANNEL_CAPACITY,
1295 Some(snapshot_dir),
1296 Some(embedder_id),
1297 Some(embedder),
1298 Some(runtime_handle),
1299 steward,
1300 Some(key),
1301 None,
1302 )
1303 }
1304
1305 #[allow(clippy::too_many_arguments)]
1310 pub fn spawn_full_with_redactor(
1311 conn: Connection,
1312 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1313 snapshot_dir: PathBuf,
1314 embedder_id: i64,
1315 embedder: Arc<dyn Embedder>,
1316 steward: Option<Arc<solo_steward::Steward>>,
1317 key: KeyMaterial,
1318 runtime_handle: Handle,
1319 redactor: Arc<crate::redaction::RedactionRegistry>,
1320 ) -> WriterSpawn {
1321 Self::spawn_internal(
1322 conn,
1323 hnsw,
1324 DEFAULT_CHANNEL_CAPACITY,
1325 Some(snapshot_dir),
1326 Some(embedder_id),
1327 Some(embedder),
1328 Some(runtime_handle),
1329 steward,
1330 Some(key),
1331 Some(redactor),
1332 )
1333 }
1334
1335 #[allow(clippy::too_many_arguments)]
1342 pub fn spawn_full_with_quota(
1343 conn: Connection,
1344 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1345 snapshot_dir: PathBuf,
1346 embedder_id: i64,
1347 embedder: Arc<dyn Embedder>,
1348 steward: Option<Arc<solo_steward::Steward>>,
1349 key: KeyMaterial,
1350 runtime_handle: Handle,
1351 redactor: Arc<crate::redaction::RedactionRegistry>,
1352 quota_bytes: Option<u64>,
1353 db_path: PathBuf,
1354 ) -> WriterSpawn {
1355 Self::spawn_internal_full(
1356 conn,
1357 hnsw,
1358 DEFAULT_CHANNEL_CAPACITY,
1359 Some(snapshot_dir),
1360 Some(embedder_id),
1361 Some(embedder),
1362 Some(runtime_handle),
1363 steward,
1364 Some(key),
1365 Some(redactor),
1366 quota_bytes,
1367 Some(db_path),
1368 None,
1369 None,
1370 None,
1371 None,
1372 )
1373 }
1374
1375 #[allow(clippy::too_many_arguments)]
1392 pub fn spawn_full_with_quota_and_slot(
1393 conn: Connection,
1394 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1395 snapshot_dir: PathBuf,
1396 embedder_id: i64,
1397 embedder: Arc<dyn Embedder>,
1398 steward: Option<Arc<solo_steward::Steward>>,
1399 key: KeyMaterial,
1400 runtime_handle: Handle,
1401 redactor: Arc<crate::redaction::RedactionRegistry>,
1402 quota_bytes: Option<u64>,
1403 db_path: PathBuf,
1404 steward_slot: Arc<
1405 AsyncRwLock<Option<Arc<solo_steward::Steward>>>,
1406 >,
1407 triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
1408 ) -> WriterSpawn {
1409 Self::spawn_internal_full(
1410 conn,
1411 hnsw,
1412 DEFAULT_CHANNEL_CAPACITY,
1413 Some(snapshot_dir),
1414 Some(embedder_id),
1415 Some(embedder),
1416 Some(runtime_handle),
1417 steward,
1418 Some(key),
1419 Some(redactor),
1420 quota_bytes,
1421 Some(db_path),
1422 Some(steward_slot),
1423 triples_batch_signal,
1424 None,
1425 None,
1426 )
1427 }
1428
1429 #[allow(clippy::too_many_arguments)]
1443 pub fn spawn_full_with_invalidate(
1444 conn: Connection,
1445 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1446 snapshot_dir: PathBuf,
1447 embedder_id: i64,
1448 embedder: Arc<dyn Embedder>,
1449 steward: Option<Arc<solo_steward::Steward>>,
1450 key: KeyMaterial,
1451 runtime_handle: Handle,
1452 redactor: Arc<crate::redaction::RedactionRegistry>,
1453 quota_bytes: Option<u64>,
1454 db_path: PathBuf,
1455 steward_slot: Arc<
1456 AsyncRwLock<Option<Arc<solo_steward::Steward>>>,
1457 >,
1458 triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
1459 invalidate_tx: broadcast::Sender<InvalidateEvent>,
1460 invalidate_tenant_id: String,
1461 ) -> WriterSpawn {
1462 Self::spawn_internal_full(
1463 conn,
1464 hnsw,
1465 DEFAULT_CHANNEL_CAPACITY,
1466 Some(snapshot_dir),
1467 Some(embedder_id),
1468 Some(embedder),
1469 Some(runtime_handle),
1470 steward,
1471 Some(key),
1472 Some(redactor),
1473 quota_bytes,
1474 Some(db_path),
1475 Some(steward_slot),
1476 triples_batch_signal,
1477 Some(invalidate_tx),
1478 Some(invalidate_tenant_id),
1479 )
1480 }
1481
1482 #[allow(clippy::too_many_arguments)]
1483 fn spawn_internal(
1484 conn: Connection,
1485 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1486 capacity: usize,
1487 snapshot_dir: Option<PathBuf>,
1488 embedder_id: Option<i64>,
1489 embedder: Option<Arc<dyn Embedder>>,
1490 runtime_handle: Option<Handle>,
1491 steward: Option<Arc<solo_steward::Steward>>,
1492 key: Option<KeyMaterial>,
1493 redactor: Option<Arc<crate::redaction::RedactionRegistry>>,
1494 ) -> WriterSpawn {
1495 Self::spawn_internal_full(
1496 conn,
1497 hnsw,
1498 capacity,
1499 snapshot_dir,
1500 embedder_id,
1501 embedder,
1502 runtime_handle,
1503 steward,
1504 key,
1505 redactor,
1506 None,
1507 None,
1508 None,
1509 None,
1510 None,
1511 None,
1512 )
1513 }
1514
1515 #[allow(clippy::too_many_arguments)]
1527 fn spawn_internal_full(
1528 conn: Connection,
1529 hnsw: Arc<dyn VectorIndex + Send + Sync>,
1530 capacity: usize,
1531 snapshot_dir: Option<PathBuf>,
1532 embedder_id: Option<i64>,
1533 embedder: Option<Arc<dyn Embedder>>,
1534 runtime_handle: Option<Handle>,
1535 steward: Option<Arc<solo_steward::Steward>>,
1536 key: Option<KeyMaterial>,
1537 redactor: Option<Arc<crate::redaction::RedactionRegistry>>,
1538 quota_bytes: Option<u64>,
1539 db_path: Option<PathBuf>,
1540 steward_slot: Option<
1541 Arc<AsyncRwLock<Option<Arc<solo_steward::Steward>>>>,
1542 >,
1543 triples_batch_signal: Option<Arc<crate::triples_batch::TriplesBatchSignal>>,
1544 invalidate_tx: Option<broadcast::Sender<InvalidateEvent>>,
1545 invalidate_tenant_id: Option<String>,
1546 ) -> WriterSpawn {
1547 let (tx, rx) = mpsc::channel(capacity);
1548 let redactor = redactor.unwrap_or_else(|| {
1549 Arc::new(
1552 crate::redaction::RedactionRegistry::from_config(
1553 &crate::config::RedactionConfig::default(),
1554 )
1555 .expect("default RedactionConfig must build a disabled registry"),
1556 )
1557 });
1558 let (invalidate_tx, invalidate_tenant_id) =
1562 match (invalidate_tx, invalidate_tenant_id) {
1563 (Some(tx), Some(tid)) => (Some(tx), Some(tid)),
1564 _ => (None, None),
1565 };
1566 let actor = Self {
1567 conn,
1568 hnsw,
1569 rx,
1570 snapshot_dir,
1571 embedder_id,
1572 embedder,
1573 runtime_handle,
1574 steward,
1575 steward_slot,
1576 triples_batch_signal,
1577 key,
1578 redactor,
1579 quota_bytes,
1580 db_path,
1581 invalidate_tx,
1582 invalidate_tenant_id,
1583 };
1584 let join = std::thread::Builder::new()
1585 .name("solo-writer".into())
1586 .spawn(move || actor.run())
1587 .expect("spawn solo-writer thread");
1588 WriterSpawn {
1589 handle: WriteHandle { tx },
1590 join,
1591 }
1592 }
1593
1594 fn run(mut self) {
1595 while let Some(cmd) = self.rx.blocking_recv() {
1596 self.dispatch(cmd);
1597 }
1598 self.shutdown();
1599 }
1600
1601 fn current_steward(&self) -> Option<Arc<solo_steward::Steward>> {
1631 if let Some(slot) = self.steward_slot.as_ref() {
1632 if let Ok(guard) = slot.try_read() {
1638 if let Some(s) = guard.as_ref() {
1639 return Some(Arc::clone(s));
1640 }
1641 }
1642 }
1643 self.steward.clone()
1644 }
1645
1646 fn dispatch(&mut self, cmd: WriteCommand) {
1647 match cmd {
1648 WriteCommand::Remember {
1649 episode,
1650 embedding,
1651 audit_principal,
1652 reply,
1653 } => self.dispatch_remember(episode, embedding, audit_principal, reply),
1654 WriteCommand::RememberBatch {
1655 items,
1656 audit_principal,
1657 reply,
1658 } => self.dispatch_remember_batch(items, audit_principal, reply),
1659 WriteCommand::Forget {
1660 memory_id,
1661 reason,
1662 audit_principal,
1663 reply,
1664 } => {
1665 let result =
1666 self.handle_forget(memory_id, reason, audit_principal.clone());
1667 let durable_ok = result.is_ok();
1670 if let Err(ref e) = result {
1671 self.emit_audit_best_effort(
1672 AuditOperation::MemoryForget,
1673 Some(memory_id.to_string()),
1674 AuditResult::Error,
1675 audit_principal,
1676 Some(serde_json::json!({ "error": e.to_string() })),
1677 );
1678 }
1679 let _ = reply.send(result);
1680 if durable_ok {
1682 self.emit_invalidate(
1683 AuditOperation::MemoryForget.as_str(),
1684 "episode",
1685 );
1686 }
1687 }
1688 WriteCommand::Update {
1689 memory_id,
1690 content,
1691 embedding,
1692 audit_principal,
1693 reply,
1694 } => {
1695 self.dispatch_update(memory_id, content, embedding, audit_principal, reply);
1696 }
1697 WriteCommand::IngestDocument {
1698 path,
1699 chunk_config,
1700 audit_principal,
1701 reply,
1702 } => {
1703 self.dispatch_ingest_document(path, chunk_config, audit_principal, reply);
1704 }
1705 WriteCommand::ForgetDocument {
1706 doc_id,
1707 audit_principal,
1708 reply,
1709 } => {
1710 let result =
1711 self.handle_forget_document(doc_id, audit_principal.clone());
1712 let durable_ok = result.is_ok();
1713 if let Err(ref e) = result {
1714 self.emit_audit_best_effort(
1715 AuditOperation::MemoryForgetDocument,
1716 Some(doc_id.to_string()),
1717 AuditResult::Error,
1718 audit_principal,
1719 Some(serde_json::json!({ "error": e.to_string() })),
1720 );
1721 }
1722 let _ = reply.send(result);
1723 if durable_ok {
1725 self.emit_invalidate(
1726 AuditOperation::MemoryForgetDocument.as_str(),
1727 "document",
1728 );
1729 }
1730 }
1731 WriteCommand::Consolidate {
1732 scope,
1733 audit_principal,
1734 reply,
1735 } => {
1736 let result = self.handle_consolidate(scope, audit_principal);
1737 let durable_ok = result.is_ok();
1738 let _ = reply.send(result);
1739 if durable_ok {
1743 self.emit_invalidate(
1744 AuditOperation::MemoryConsolidate.as_str(),
1745 "cluster",
1746 );
1747 }
1748 }
1749 WriteCommand::Reembed {
1750 scope,
1751 audit_principal,
1752 reply,
1753 } => {
1754 let result = self.handle_reembed(scope, audit_principal);
1755 let durable_ok = result.is_ok();
1756 let _ = reply.send(result);
1757 if durable_ok {
1761 self.emit_invalidate(
1762 AuditOperation::MemoryReembed.as_str(),
1763 "episode",
1764 );
1765 }
1766 }
1767 WriteCommand::SaveSnapshot { reply } => {
1768 let _ = reply.send(self.handle_save_snapshot());
1769 }
1770 WriteCommand::Backup { dest_path, reply } => {
1771 let _ = reply.send(self.handle_backup(&dest_path));
1772 }
1773 WriteCommand::NormalizeSubjects {
1774 aliases,
1775 dry_run,
1776 audit_principal,
1777 reply,
1778 } => {
1779 let result = self.handle_normalize_subjects(
1780 aliases,
1781 dry_run,
1782 audit_principal,
1783 );
1784 let durable_ok = result.is_ok() && !dry_run;
1785 let _ = reply.send(result);
1786 if durable_ok {
1789 self.emit_invalidate(
1790 AuditOperation::MemoryNormalizeSubjects.as_str(),
1791 "triple",
1792 );
1793 }
1794 }
1795 WriteCommand::EmitLlmSamplingAudit { event, reply } => {
1796 let _ = reply.send(self.handle_emit_llm_sampling_audit(event));
1797 }
1798 WriteCommand::ResolveContradiction {
1799 a_id,
1800 b_id,
1801 kind,
1802 status,
1803 resolution_note,
1804 winning_triple_id,
1805 audit_principal,
1806 reply,
1807 } => {
1808 let result = self.handle_resolve_contradiction(
1809 a_id,
1810 b_id,
1811 kind,
1812 status,
1813 resolution_note,
1814 winning_triple_id,
1815 audit_principal,
1816 );
1817 let durable_ok = result.is_ok();
1818 let _ = reply.send(result);
1819 if durable_ok {
1822 self.emit_invalidate(
1823 AuditOperation::MemoryContradictionResolve.as_str(),
1824 "contradiction",
1825 );
1826 }
1827 }
1828 WriteCommand::AttachAbstractionBatch {
1829 items,
1830 episode_count,
1831 duration_ms,
1832 clusters_deferred,
1833 audit_principal,
1834 reply,
1835 } => {
1836 let result = self.handle_attach_abstraction_batch(
1837 items,
1838 episode_count,
1839 duration_ms,
1840 clusters_deferred,
1841 audit_principal,
1842 );
1843 let durable_ok = result.is_ok();
1844 let _ = reply.send(result);
1845 if durable_ok {
1850 self.emit_invalidate(
1851 AuditOperation::MemoryTriplesExtract.as_str(),
1852 "cluster",
1853 );
1854 }
1855 }
1856 }
1857 }
1858
1859 fn handle_emit_llm_sampling_audit(&mut self, event: AuditEvent) -> Result<()> {
1873 let tx = self
1874 .conn
1875 .transaction_with_behavior(TransactionBehavior::Immediate)
1876 .map_err(|e| {
1877 Error::storage(format!(
1878 "BEGIN IMMEDIATE for llm.sampling_call audit: {e}"
1879 ))
1880 })?;
1881 insert_audit_row_in_tx(&tx, &event)?;
1882 tx.commit().map_err(|e| {
1883 Error::storage(format!("COMMIT llm.sampling_call audit: {e}"))
1884 })?;
1885 Ok(())
1886 }
1887
1888 #[allow(clippy::too_many_arguments)]
1896 fn handle_resolve_contradiction(
1897 &mut self,
1898 a_id: String,
1899 b_id: String,
1900 kind: String,
1901 status: String,
1902 resolution_note: Option<String>,
1903 winning_triple_id: Option<String>,
1904 audit_principal: Option<String>,
1905 ) -> Result<ResolveContradictionReport> {
1906 let status = status.trim().to_string();
1907 if !matches!(status.as_str(), "unresolved" | "resolved" | "reopened") {
1908 return Err(Error::invalid_input(
1909 "contradiction status must be unresolved, resolved, or reopened",
1910 ));
1911 }
1912 let note = resolution_note
1913 .map(|s| s.trim().to_string())
1914 .filter(|s| !s.is_empty());
1915 let winning = winning_triple_id
1916 .map(|s| s.trim().to_string())
1917 .filter(|s| !s.is_empty());
1918 let resolved_at_ms = if status == "resolved" {
1919 Some(chrono::Utc::now().timestamp_millis())
1920 } else {
1921 None
1922 };
1923 let now_ms = chrono::Utc::now().timestamp_millis();
1924
1925 let tx = self
1926 .conn
1927 .transaction_with_behavior(TransactionBehavior::Immediate)
1928 .map_err(|e| {
1929 Error::storage(format!(
1930 "BEGIN IMMEDIATE for resolve_contradiction: {e}"
1931 ))
1932 })?;
1933
1934 let changed = tx
1935 .execute(
1936 "UPDATE contradictions
1937 SET status = ?4,
1938 resolved_at_ms = ?5,
1939 resolution_note = ?6,
1940 winning_triple_id = ?7
1941 WHERE a_memory_id = ?1
1942 AND b_memory_id = ?2
1943 AND kind = ?3",
1944 rusqlite::params![a_id, b_id, kind, status, resolved_at_ms, note, winning],
1945 )
1946 .map_err(|e| Error::storage(format!("UPDATE contradictions: {e}")))?;
1947 if changed == 0 {
1948 return Err(Error::not_found("contradiction not found"));
1950 }
1951
1952 let target = format!("{a_id}:{b_id}:{kind}");
1954 insert_audit_row_in_tx(
1955 &tx,
1956 &AuditEvent {
1957 ts_ms: now_ms,
1958 principal_subject: audit_principal,
1959 operation: AuditOperation::MemoryContradictionResolve,
1960 target_id: Some(target),
1961 result: AuditResult::Ok,
1962 details: None,
1963 },
1964 )?;
1965
1966 tx.commit()
1967 .map_err(|e| Error::storage(format!("COMMIT resolve_contradiction: {e}")))?;
1968
1969 Ok(ResolveContradictionReport {
1970 a_id,
1971 b_id,
1972 kind,
1973 status,
1974 resolved_at_ms,
1975 resolution_note: note,
1976 winning_triple_id: winning,
1977 })
1978 }
1979
1980 fn handle_attach_abstraction_batch(
2000 &mut self,
2001 items: Vec<(MemoryId, solo_core::SemanticAbstraction)>,
2002 episode_count: usize,
2003 duration_ms: u64,
2004 clusters_deferred: usize,
2005 audit_principal: Option<String>,
2006 ) -> Result<AttachAbstractionBatchReport> {
2007 for (cluster_id, abstraction) in &items {
2011 if abstraction.cluster_id != *cluster_id {
2012 return Err(Error::Other(format!(
2013 "AttachAbstractionBatch: cluster_id mismatch on tuple \
2014 (got {} but abstraction.cluster_id is {})",
2015 cluster_id, abstraction.cluster_id
2016 )));
2017 }
2018 }
2019
2020 let now_ms = chrono::Utc::now().timestamp_millis();
2021 let tx = self
2022 .conn
2023 .transaction_with_behavior(TransactionBehavior::Immediate)
2024 .map_err(|e| {
2025 Error::storage(format!(
2026 "BEGIN IMMEDIATE for attach_abstraction_batch: {e}"
2027 ))
2028 })?;
2029
2030 let mut report = AttachAbstractionBatchReport::default();
2031 for (idx, (cluster_id, abstraction)) in items.iter().enumerate() {
2032 let prov_json = match serde_json::to_string(&abstraction.provenance) {
2033 Ok(s) => s,
2034 Err(e) => {
2035 tracing::warn!(
2036 cluster_id = %cluster_id,
2037 error = %e,
2038 "attach_abstraction_batch: serialize provenance failed; skipping cluster"
2039 );
2040 report.clusters_failed += 1;
2041 continue;
2042 }
2043 };
2044
2045 let sp_name = format!("cluster_{idx}");
2063
2064 if let Err(e) = tx.execute_batch(&format!("SAVEPOINT {sp_name};")) {
2065 tracing::warn!(
2066 cluster_id = %cluster_id,
2067 error = %e,
2068 "attach_abstraction_batch: open SAVEPOINT failed; skipping cluster"
2069 );
2070 report.clusters_failed += 1;
2071 continue;
2072 }
2073
2074 let per_cluster_res = (|| -> rusqlite::Result<()> {
2075 tx.execute(
2079 "DELETE FROM semantic_abstractions WHERE cluster_id = ?",
2080 params![cluster_id.to_string()],
2081 )?;
2082 tx.execute(
2083 "DELETE FROM triples WHERE cluster_id = ?",
2084 params![cluster_id.to_string()],
2085 )?;
2086
2087 tx.execute(
2088 "INSERT INTO semantic_abstractions
2089 (abstraction_id, cluster_id, content, provenance_json,
2090 confidence, created_at_ms)
2091 VALUES (?, ?, ?, ?, ?, ?)",
2092 params![
2093 abstraction.abstraction_id.to_string(),
2094 abstraction.cluster_id.to_string(),
2095 abstraction.content,
2096 prov_json,
2097 abstraction.confidence.0,
2098 now_ms,
2099 ],
2100 )?;
2101 for triple in &abstraction.triples {
2102 let tprov = serde_json::to_string(&triple.provenance)
2103 .unwrap_or_else(|_| "{}".to_string());
2104 let object_kind_str = match triple.object_kind {
2105 solo_core::TripleObjectKind::Entity => "entity",
2106 solo_core::TripleObjectKind::Literal => "literal",
2107 };
2108 let source_eid = resolve_source_episode_id_in_tx(
2109 &tx,
2110 &triple.provenance,
2111 );
2112 tx.execute(
2113 "INSERT INTO triples
2114 (triple_id, subject_id, predicate, object_id,
2115 object_kind, valid_from_ms, valid_to_ms,
2116 confidence, provenance_json,
2117 created_at_ms, updated_at_ms, cluster_id,
2118 source_episode_id)
2119 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2120 params![
2121 triple.triple_id.to_string(),
2122 triple.subject_id,
2123 triple.predicate,
2124 triple.object_id,
2125 object_kind_str,
2126 triple.valid_from_ms,
2127 triple.valid_to_ms,
2128 triple.confidence.0,
2129 tprov,
2130 now_ms,
2131 now_ms,
2132 cluster_id.to_string(),
2133 source_eid,
2134 ],
2135 )?;
2136 }
2137 Ok(())
2138 })();
2139
2140 match per_cluster_res {
2141 Ok(()) => {
2142 if let Err(e) =
2145 tx.execute_batch(&format!("RELEASE SAVEPOINT {sp_name};"))
2146 {
2147 tracing::warn!(
2151 cluster_id = %cluster_id,
2152 error = %e,
2153 "attach_abstraction_batch: RELEASE SAVEPOINT failed; cluster booked as failed"
2154 );
2155 let _ = tx.execute_batch(&format!(
2156 "ROLLBACK TO SAVEPOINT {sp_name}; RELEASE SAVEPOINT {sp_name};"
2157 ));
2158 report.clusters_failed += 1;
2159 continue;
2160 }
2161 report.abstractions_built += 1;
2162 report.triples_extracted += abstraction.triples.len();
2163 }
2164 Err(e) => {
2165 tracing::warn!(
2174 cluster_id = %cluster_id,
2175 error = %e,
2176 "attach_abstraction_batch: per-cluster work failed; ROLLBACK TO SAVEPOINT"
2177 );
2178 if let Err(rb_err) = tx.execute_batch(&format!(
2179 "ROLLBACK TO SAVEPOINT {sp_name}; RELEASE SAVEPOINT {sp_name};"
2180 )) {
2181 return Err(Error::storage(format!(
2185 "ROLLBACK TO SAVEPOINT for cluster {cluster_id} \
2186 failed (rb_err={rb_err}; original cluster err={e}); \
2187 aborting entire batch"
2188 )));
2189 }
2190 report.clusters_failed += 1;
2191 }
2192 }
2193 }
2194
2195 report.clusters_deferred = clusters_deferred;
2199
2200 let audit_event = AuditEvent {
2204 ts_ms: now_ms,
2205 principal_subject: audit_principal,
2206 operation: AuditOperation::MemoryTriplesExtract,
2207 target_id: None,
2208 result: AuditResult::Ok,
2209 details: Some(serde_json::json!({
2210 "episode_count": episode_count,
2211 "cluster_count": items.len(),
2212 "abstractions_built": report.abstractions_built,
2213 "triples_extracted": report.triples_extracted,
2214 "clusters_failed": report.clusters_failed,
2215 "clusters_deferred": clusters_deferred,
2216 "duration_ms": duration_ms,
2217 })),
2218 };
2219 insert_audit_row_in_tx(&tx, &audit_event)?;
2220
2221 tx.commit().map_err(|e| {
2222 Error::storage(format!(
2223 "COMMIT attach_abstraction_batch: {e}"
2224 ))
2225 })?;
2226 Ok(report)
2227 }
2228
2229 fn dispatch_remember(
2230 &mut self,
2231 episode: Episode,
2232 embedding: Embedding,
2233 audit_principal: Option<String>,
2234 reply: oneshot::Sender<Result<MemoryId>>,
2235 ) {
2236 let memory_id = episode.memory_id;
2237 let result =
2238 self.handle_remember_durable(episode, embedding, audit_principal.clone());
2239 let durable_ok = result.is_ok();
2240 if let Err(ref e) = result {
2244 self.emit_audit_best_effort(
2245 AuditOperation::MemoryRemember,
2246 Some(memory_id.to_string()),
2247 AuditResult::Error,
2248 audit_principal,
2249 Some(serde_json::json!({ "error": e.to_string() })),
2250 );
2251 }
2252 let _ = reply.send(result);
2253
2254 if durable_ok {
2258 self.emit_invalidate(AuditOperation::MemoryRemember.as_str(), "episode");
2259 }
2260
2261 if durable_ok {
2262 if let Some(sig) = self.triples_batch_signal.as_ref() {
2269 sig.note_episode_remembered();
2270 }
2271
2272 if let Err(e) = self.conn.execute(
2273 "DELETE FROM pending_index WHERE memory_id = ?",
2274 params![memory_id.to_string()],
2275 ) {
2276 tracing::warn!(
2277 error = %e,
2278 %memory_id,
2279 "pending_index drain failed; will replay on next startup"
2280 );
2281 }
2282 }
2283 }
2284
2285 fn dispatch_update(
2286 &mut self,
2287 memory_id: MemoryId,
2288 content: String,
2289 embedding: Embedding,
2290 audit_principal: Option<String>,
2291 reply: oneshot::Sender<Result<MemoryUpdateReport>>,
2292 ) {
2293 let result =
2294 self.handle_update_durable(memory_id, content, embedding, audit_principal.clone());
2295 let durable_ok = result.is_ok();
2296 if let Err(ref e) = result {
2297 self.emit_audit_best_effort(
2298 AuditOperation::MemoryUpdate,
2299 Some(memory_id.to_string()),
2300 AuditResult::Error,
2301 audit_principal,
2302 Some(serde_json::json!({ "error": e.to_string() })),
2303 );
2304 }
2305 let memory_id_for_drain = result.as_ref().ok().map(|r| r.memory_id);
2306 let _ = reply.send(result);
2307
2308 if durable_ok {
2309 self.emit_invalidate(AuditOperation::MemoryUpdate.as_str(), "episode");
2310 if let Some(mid) = memory_id_for_drain {
2311 if let Err(e) = self.conn.execute(
2312 "DELETE FROM pending_index WHERE kind = 'episode' AND memory_id = ?",
2313 params![mid.to_string()],
2314 ) {
2315 tracing::warn!(
2316 error = %e,
2317 %mid,
2318 "pending_index drain failed (update); will replay on next startup"
2319 );
2320 }
2321 }
2322 }
2323 }
2324
2325 fn handle_update_durable(
2326 &mut self,
2327 memory_id: MemoryId,
2328 content: String,
2329 embedding: Embedding,
2330 audit_principal: Option<String>,
2331 ) -> Result<MemoryUpdateReport> {
2332 embedding.validate()?;
2333 let f32_slice = embedding.as_f32_slice().ok_or_else(|| {
2334 Error::embedder("HNSW expects F32 embeddings; convert dtype upstream")
2335 })?;
2336 let content = content.trim();
2337 if content.is_empty() {
2338 return Err(Error::invalid_input(
2339 "updated memory content must not be empty",
2340 ));
2341 }
2342
2343 let redaction = self.redactor.redact(content);
2344 let redacted_content: &str = redaction.text.as_ref();
2345 let memory_id_s = memory_id.to_string();
2346 let now_ms = chrono::Utc::now().timestamp_millis();
2347
2348 let tx = self
2349 .conn
2350 .transaction_with_behavior(TransactionBehavior::Immediate)
2351 .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for update: {e}")))?;
2352
2353 let existing: Option<(i64, String)> = tx
2354 .query_row(
2355 "SELECT rowid, status FROM episodes WHERE memory_id = ?1",
2356 params![&memory_id_s],
2357 |r| Ok((r.get(0)?, r.get(1)?)),
2358 )
2359 .optional()
2360 .map_err(|e| Error::storage(format!("SELECT episode for update: {e}")))?;
2361 let (rowid, status) =
2362 existing.ok_or_else(|| Error::not_found("memory not found"))?;
2363 if status != "active" {
2364 return Err(Error::conflict("cannot update a non-active memory"));
2365 }
2366
2367 tx.execute(
2368 "UPDATE episodes
2369 SET content = ?2,
2370 updated_at_ms = ?3
2371 WHERE memory_id = ?1",
2372 params![&memory_id_s, redacted_content, now_ms],
2373 )
2374 .map_err(|e| Error::storage(format!("UPDATE episode: {e}")))?;
2375
2376 if let Some(eid) = self.embedder_id {
2377 let dtype_str = match embedding.dtype {
2378 solo_core::EmbeddingDtype::F32 => "f32",
2379 solo_core::EmbeddingDtype::F16 => "f16",
2380 solo_core::EmbeddingDtype::I8 => "i8",
2381 solo_core::EmbeddingDtype::Binary => "binary",
2382 };
2383 tx.execute(
2384 "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
2385 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
2386 ON CONFLICT(memory_id, embedder_id)
2387 DO UPDATE SET dtype = excluded.dtype,
2388 dim = excluded.dim,
2389 vector = excluded.vector,
2390 created_at_ms = excluded.created_at_ms",
2391 params![
2392 &memory_id_s,
2393 eid,
2394 dtype_str,
2395 embedding.dim as i64,
2396 &embedding.data[..],
2397 now_ms
2398 ],
2399 )
2400 .map_err(|e| Error::storage(format!("UPSERT embedding: {e}")))?;
2401 }
2402
2403 tx.execute(
2404 "INSERT INTO pending_index (kind, memory_id, embedding, embedding_dim, enqueued_at)
2405 VALUES ('episode', ?1, ?2, ?3, ?4)
2406 ON CONFLICT(memory_id)
2407 DO UPDATE SET kind = 'episode',
2408 chunk_id = NULL,
2409 embedding = excluded.embedding,
2410 embedding_dim = excluded.embedding_dim,
2411 enqueued_at = excluded.enqueued_at",
2412 params![&memory_id_s, &embedding.data[..], embedding.dim as i64, now_ms],
2413 )
2414 .map_err(|e| Error::storage(format!("UPSERT pending_index: {e}")))?;
2415
2416 if !redaction.matches.is_empty() {
2417 insert_audit_row_in_tx(
2418 &tx,
2419 &redaction_audit_event(
2420 now_ms,
2421 audit_principal.clone(),
2422 Some(memory_id.to_string()),
2423 &redaction.matches,
2424 ),
2425 )?;
2426 }
2427
2428 insert_audit_row_in_tx(
2429 &tx,
2430 &AuditEvent {
2431 ts_ms: now_ms,
2432 principal_subject: audit_principal,
2433 operation: AuditOperation::MemoryUpdate,
2434 target_id: Some(memory_id.to_string()),
2435 result: AuditResult::Ok,
2436 details: None,
2437 },
2438 )?;
2439
2440 tx.commit()
2441 .map_err(|e| Error::storage(format!("COMMIT update: {e}")))?;
2442
2443 self.hnsw.add(episode_hnsw_id(rowid), f32_slice)?;
2444
2445 Ok(MemoryUpdateReport {
2446 memory_id,
2447 rowid,
2448 content: redacted_content.to_string(),
2449 updated_at_ms: now_ms,
2450 })
2451 }
2452
2453 fn handle_remember_durable(
2454 &mut self,
2455 episode: Episode,
2456 embedding: Embedding,
2457 audit_principal: Option<String>,
2458 ) -> Result<MemoryId> {
2459 embedding.validate()?;
2460 let memory_id = episode.memory_id;
2461
2462 let estimated_growth: u64 = (episode.content.len() as u64)
2470 .saturating_add(embedding.data.len() as u64)
2471 .saturating_add(2048);
2476 match check_quota(
2477 self.quota_bytes,
2478 self.db_path.as_deref(),
2479 estimated_growth,
2480 ) {
2481 QuotaDecision::Unlimited | QuotaDecision::Allowed { .. } => {}
2482 QuotaDecision::Exceeded {
2483 current_size,
2484 estimated_growth,
2485 quota,
2486 } => {
2487 let err = QuotaExceededError {
2493 current_size,
2494 estimated_growth,
2495 quota,
2496 };
2497 self.emit_audit_best_effort(
2504 AuditOperation::MemoryRemember,
2505 Some(memory_id.to_string()),
2506 AuditResult::Forbidden,
2507 audit_principal,
2508 Some(err.to_details_json()),
2509 );
2510 return Err(Error::forbidden(err.to_string()));
2511 }
2512 }
2513
2514 let redaction = self.redactor.redact(&episode.content);
2526 let redacted_content: &str = redaction.text.as_ref();
2527
2528 let tx = self
2529 .conn
2530 .transaction_with_behavior(TransactionBehavior::Immediate)
2531 .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for remember: {e}")))?;
2532
2533 let now_ms = chrono::Utc::now().timestamp_millis();
2534 let encoding_ctx = serde_json::to_string(&episode.encoding_context)
2535 .map_err(|e| Error::storage(format!("serialize encoding_context: {e}")))?;
2536 let provenance_json = match &episode.provenance {
2537 Some(p) => Some(
2538 serde_json::to_string(p)
2539 .map_err(|e| Error::storage(format!("serialize provenance: {e}")))?,
2540 ),
2541 None => None,
2542 };
2543 let tier_str = match episode.tier {
2544 Tier::Hot => "hot",
2545 Tier::Warm => "warm",
2546 Tier::Cold => "cold",
2547 };
2548
2549 tx.execute(
2550 "INSERT INTO episodes (
2551 memory_id, ts_ms, source_type, source_id, content,
2552 encoding_context_json, provenance_json, confidence,
2553 strength, salience, tier, created_at_ms, updated_at_ms,
2554 principal_subject
2555 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2556 params![
2557 memory_id.to_string(),
2558 episode.ts_ms,
2559 episode.source_type,
2560 episode.source_id,
2561 redacted_content,
2562 encoding_ctx,
2563 provenance_json,
2564 episode.confidence.0,
2565 episode.strength,
2566 episode.salience,
2567 tier_str,
2568 now_ms,
2569 now_ms,
2570 audit_principal.as_deref(),
2571 ],
2572 )
2573 .map_err(|e| Error::storage(format!("INSERT episode: {e}")))?;
2574
2575 let rowid = tx.last_insert_rowid();
2576
2577 if let Some(eid) = self.embedder_id {
2589 let dtype_str = match embedding.dtype {
2590 solo_core::EmbeddingDtype::F32 => "f32",
2591 solo_core::EmbeddingDtype::F16 => "f16",
2592 solo_core::EmbeddingDtype::I8 => "i8",
2593 solo_core::EmbeddingDtype::Binary => "binary",
2594 };
2595 tx.execute(
2596 "INSERT INTO embeddings (
2597 memory_id, embedder_id, dtype, dim, vector, created_at_ms
2598 ) VALUES (?, ?, ?, ?, ?, ?)",
2599 params![
2600 memory_id.to_string(),
2601 eid,
2602 dtype_str,
2603 embedding.dim as i64,
2604 &embedding.data[..],
2605 now_ms,
2606 ],
2607 )
2608 .map_err(|e| Error::storage(format!("INSERT embeddings: {e}")))?;
2609 }
2610
2611 tx.execute(
2612 "INSERT INTO pending_index (
2613 memory_id, embedding, embedding_dim, enqueued_at
2614 ) VALUES (?, ?, ?, ?)",
2615 params![
2616 memory_id.to_string(),
2617 &embedding.data[..],
2618 embedding.dim as i64,
2619 now_ms,
2620 ],
2621 )
2622 .map_err(|e| Error::storage(format!("INSERT pending_index: {e}")))?;
2623
2624 if !redaction.matches.is_empty() {
2629 insert_audit_row_in_tx(
2630 &tx,
2631 &redaction_audit_event(
2632 now_ms,
2633 audit_principal.clone(),
2634 Some(memory_id.to_string()),
2635 &redaction.matches,
2636 ),
2637 )?;
2638 }
2639
2640 insert_audit_row_in_tx(
2644 &tx,
2645 &AuditEvent {
2646 ts_ms: now_ms,
2647 principal_subject: audit_principal,
2648 operation: AuditOperation::MemoryRemember,
2649 target_id: Some(memory_id.to_string()),
2650 result: AuditResult::Ok,
2651 details: None,
2652 },
2653 )?;
2654
2655 tx.commit()
2656 .map_err(|e| Error::storage(format!("COMMIT remember: {e}")))?;
2657
2658 let f32_slice = embedding.as_f32_slice().ok_or_else(|| {
2659 Error::embedder("HNSW expects F32 embeddings; convert dtype upstream")
2660 })?;
2661 self.hnsw.add(episode_hnsw_id(rowid), f32_slice)?;
2664
2665 Ok(memory_id)
2666 }
2667
2668 fn dispatch_remember_batch(
2674 &mut self,
2675 items: Vec<(Episode, Embedding)>,
2676 audit_principal: Option<String>,
2677 reply: oneshot::Sender<Result<Vec<MemoryId>>>,
2678 ) {
2679 let item_count = items.len();
2680
2681 let result = self.handle_remember_batch_durable(items, audit_principal.clone());
2682
2683 let (forwarded, drain_ids): (Result<Vec<MemoryId>>, Vec<MemoryId>) = match result {
2690 Ok((ids, hnsw_ok_mask)) => {
2691 let drain: Vec<MemoryId> = ids
2692 .iter()
2693 .zip(hnsw_ok_mask.iter().copied())
2694 .filter_map(|(mid, ok)| if ok { Some(*mid) } else { None })
2695 .collect();
2696 (Ok(ids), drain)
2697 }
2698 Err(e) => (Err(e), Vec::new()),
2699 };
2700 let durable_ok = forwarded.is_ok();
2701
2702 if let Err(ref e) = forwarded {
2706 self.emit_audit_best_effort(
2707 AuditOperation::MemoryRememberBatch,
2708 None,
2709 AuditResult::Error,
2710 audit_principal,
2711 Some(serde_json::json!({
2712 "error": e.to_string(),
2713 "item_count": item_count,
2714 })),
2715 );
2716 }
2717 let _ = reply.send(forwarded);
2718
2719 if durable_ok {
2720 self.emit_invalidate(AuditOperation::MemoryRememberBatch.as_str(), "episode");
2723
2724 if let Some(sig) = self.triples_batch_signal.as_ref() {
2728 for _ in 0..item_count {
2729 sig.note_episode_remembered();
2730 }
2731 }
2732
2733 for mid in &drain_ids {
2737 if let Err(e) = self.conn.execute(
2738 "DELETE FROM pending_index WHERE memory_id = ?",
2739 params![mid.to_string()],
2740 ) {
2741 tracing::warn!(
2742 error = %e,
2743 %mid,
2744 "pending_index drain failed (batch); will replay on next startup"
2745 );
2746 }
2747 }
2748 }
2749 }
2750
2751 fn handle_remember_batch_durable(
2776 &mut self,
2777 items: Vec<(Episode, Embedding)>,
2778 audit_principal: Option<String>,
2779 ) -> Result<(Vec<MemoryId>, Vec<bool>)> {
2780 if items.is_empty() {
2782 return Err(Error::invalid_input(
2783 "memory_remember_batch: items must not be empty".to_string(),
2784 ));
2785 }
2786 if items.len() > MAX_REMEMBER_BATCH_SIZE {
2787 return Err(Error::invalid_input(format!(
2788 "memory_remember_batch: {} items exceeds MAX_REMEMBER_BATCH_SIZE = {}",
2789 items.len(),
2790 MAX_REMEMBER_BATCH_SIZE,
2791 )));
2792 }
2793
2794 for (_, embedding) in &items {
2797 embedding.validate()?;
2798 }
2799
2800 let mut total_growth: u64 = 0;
2802 for (episode, embedding) in &items {
2803 total_growth = total_growth.saturating_add(
2804 (episode.content.len() as u64)
2805 .saturating_add(embedding.data.len() as u64)
2806 .saturating_add(2048),
2807 );
2808 }
2809 match check_quota(self.quota_bytes, self.db_path.as_deref(), total_growth) {
2810 QuotaDecision::Unlimited | QuotaDecision::Allowed { .. } => {}
2811 QuotaDecision::Exceeded {
2812 current_size,
2813 estimated_growth,
2814 quota,
2815 } => {
2816 let err = QuotaExceededError {
2817 current_size,
2818 estimated_growth,
2819 quota,
2820 };
2821 self.emit_audit_best_effort(
2822 AuditOperation::MemoryRememberBatch,
2823 None,
2824 AuditResult::Forbidden,
2825 audit_principal,
2826 Some(err.to_details_json()),
2827 );
2828 return Err(Error::forbidden(err.to_string()));
2829 }
2830 }
2831
2832 let tx = self
2834 .conn
2835 .transaction_with_behavior(TransactionBehavior::Immediate)
2836 .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for remember_batch: {e}")))?;
2837
2838 let now_ms = chrono::Utc::now().timestamp_millis();
2839 let mut memory_ids: Vec<MemoryId> = Vec::with_capacity(items.len());
2840 let mut rowids: Vec<i64> = Vec::with_capacity(items.len());
2841
2842 for (episode, embedding) in &items {
2844 let memory_id = episode.memory_id;
2845
2846 let redaction = self.redactor.redact(&episode.content);
2850 let redacted_content: &str = redaction.text.as_ref();
2851
2852 let encoding_ctx = serde_json::to_string(&episode.encoding_context)
2853 .map_err(|e| Error::storage(format!("serialize encoding_context: {e}")))?;
2854 let provenance_json = match &episode.provenance {
2855 Some(p) => Some(
2856 serde_json::to_string(p)
2857 .map_err(|e| Error::storage(format!("serialize provenance: {e}")))?,
2858 ),
2859 None => None,
2860 };
2861 let tier_str = match episode.tier {
2862 Tier::Hot => "hot",
2863 Tier::Warm => "warm",
2864 Tier::Cold => "cold",
2865 };
2866
2867 tx.execute(
2868 "INSERT INTO episodes (
2869 memory_id, ts_ms, source_type, source_id, content,
2870 encoding_context_json, provenance_json, confidence,
2871 strength, salience, tier, created_at_ms, updated_at_ms,
2872 principal_subject
2873 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2874 params![
2875 memory_id.to_string(),
2876 episode.ts_ms,
2877 episode.source_type,
2878 episode.source_id,
2879 redacted_content,
2880 encoding_ctx,
2881 provenance_json,
2882 episode.confidence.0,
2883 episode.strength,
2884 episode.salience,
2885 tier_str,
2886 now_ms,
2887 now_ms,
2888 audit_principal.as_deref(),
2889 ],
2890 )
2891 .map_err(|e| Error::storage(format!("INSERT episode (batch): {e}")))?;
2892
2893 let rowid = tx.last_insert_rowid();
2894
2895 if let Some(eid) = self.embedder_id {
2896 let dtype_str = match embedding.dtype {
2897 solo_core::EmbeddingDtype::F32 => "f32",
2898 solo_core::EmbeddingDtype::F16 => "f16",
2899 solo_core::EmbeddingDtype::I8 => "i8",
2900 solo_core::EmbeddingDtype::Binary => "binary",
2901 };
2902 tx.execute(
2903 "INSERT INTO embeddings (
2904 memory_id, embedder_id, dtype, dim, vector, created_at_ms
2905 ) VALUES (?, ?, ?, ?, ?, ?)",
2906 params![
2907 memory_id.to_string(),
2908 eid,
2909 dtype_str,
2910 embedding.dim as i64,
2911 &embedding.data[..],
2912 now_ms,
2913 ],
2914 )
2915 .map_err(|e| Error::storage(format!("INSERT embeddings (batch): {e}")))?;
2916 }
2917
2918 tx.execute(
2919 "INSERT INTO pending_index (
2920 memory_id, embedding, embedding_dim, enqueued_at
2921 ) VALUES (?, ?, ?, ?)",
2922 params![
2923 memory_id.to_string(),
2924 &embedding.data[..],
2925 embedding.dim as i64,
2926 now_ms,
2927 ],
2928 )
2929 .map_err(|e| Error::storage(format!("INSERT pending_index (batch): {e}")))?;
2930
2931 if !redaction.matches.is_empty() {
2935 insert_audit_row_in_tx(
2936 &tx,
2937 &redaction_audit_event(
2938 now_ms,
2939 audit_principal.clone(),
2940 Some(memory_id.to_string()),
2941 &redaction.matches,
2942 ),
2943 )?;
2944 }
2945
2946 memory_ids.push(memory_id);
2947 rowids.push(rowid);
2948 }
2949
2950 insert_audit_row_in_tx(
2953 &tx,
2954 &AuditEvent {
2955 ts_ms: now_ms,
2956 principal_subject: audit_principal,
2957 operation: AuditOperation::MemoryRememberBatch,
2958 target_id: None,
2959 result: AuditResult::Ok,
2960 details: Some(serde_json::json!({
2961 "item_count": items.len(),
2962 })),
2963 },
2964 )?;
2965
2966 tx.commit()
2968 .map_err(|e| Error::storage(format!("COMMIT remember_batch: {e}")))?;
2969
2970 let mut hnsw_ok_mask: Vec<bool> = Vec::with_capacity(items.len());
2979 for ((episode, embedding), rowid) in items.iter().zip(rowids.iter()) {
2980 let f32_slice = match embedding.as_f32_slice() {
2981 Some(s) => s,
2982 None => {
2983 tracing::warn!(
2984 memory_id = %episode.memory_id,
2985 "remember_batch: embedding not F32 — HNSW add skipped; pending_index row left for replay"
2986 );
2987 hnsw_ok_mask.push(false);
2988 continue;
2989 }
2990 };
2991 match self.hnsw.add(episode_hnsw_id(*rowid), f32_slice) {
2992 Ok(()) => hnsw_ok_mask.push(true),
2993 Err(e) => {
2994 tracing::warn!(
2995 error = %e,
2996 memory_id = %episode.memory_id,
2997 "remember_batch: hnsw.add failed; pending_index row left for replay"
2998 );
2999 hnsw_ok_mask.push(false);
3000 }
3001 }
3002 }
3003
3004 Ok((memory_ids, hnsw_ok_mask))
3005 }
3006
3007 fn handle_forget(
3008 &mut self,
3009 memory_id: MemoryId,
3010 reason: String,
3011 audit_principal: Option<String>,
3012 ) -> Result<()> {
3013 let now_ms = chrono::Utc::now().timestamp_millis();
3032 let id_str = memory_id.to_string();
3033
3034 let rowid: Option<i64> = self
3039 .conn
3040 .query_row(
3041 "SELECT rowid FROM episodes WHERE memory_id = ?",
3042 params![&id_str],
3043 |r| r.get::<_, i64>(0),
3044 )
3045 .optional()
3046 .map_err(|e| Error::storage(format!("lookup rowid for forget: {e}")))?;
3047 let Some(rowid) = rowid else {
3048 return Err(Error::not_found(format!(
3049 "memory_id {memory_id} not found in episodes"
3050 )));
3051 };
3052
3053 let tx = self
3056 .conn
3057 .transaction_with_behavior(TransactionBehavior::Immediate)
3058 .map_err(|e| Error::storage(format!("BEGIN IMMEDIATE for forget: {e}")))?;
3059
3060 let updated = tx
3061 .execute(
3062 "UPDATE episodes
3063 SET status = 'forgotten', updated_at_ms = ?
3064 WHERE memory_id = ? AND status <> 'forgotten'",
3065 params![now_ms, &id_str],
3066 )
3067 .map_err(|e| Error::storage(format!("UPDATE episodes for forget: {e}")))?;
3068
3069 insert_audit_row_in_tx(
3070 &tx,
3071 &AuditEvent {
3072 ts_ms: now_ms,
3073 principal_subject: audit_principal,
3074 operation: AuditOperation::MemoryForget,
3075 target_id: Some(id_str.clone()),
3076 result: AuditResult::Ok,
3077 details: Some(serde_json::json!({ "reason": reason })),
3078 },
3079 )?;
3080
3081 tx.commit()
3082 .map_err(|e| Error::storage(format!("COMMIT forget: {e}")))?;
3083
3084 if let Err(e) = self.hnsw.remove(episode_hnsw_id(rowid)) {
3088 tracing::warn!(
3089 error = %e,
3090 %memory_id,
3091 "hnsw.remove during forget failed (non-fatal; SQL filter still hides the row)"
3092 );
3093 }
3094
3095 if updated == 0 {
3096 tracing::debug!(%memory_id, "forget called on already-forgotten episode (idempotent)");
3098 return Ok(());
3099 }
3100 tracing::info!(%memory_id, %reason, "episode soft-deleted (status=forgotten)");
3101 Ok(())
3102 }
3103
3104 fn dispatch_ingest_document(
3120 &mut self,
3121 path: std::path::PathBuf,
3122 chunk_config: crate::document::ChunkConfig,
3123 audit_principal: Option<String>,
3124 reply: oneshot::Sender<Result<IngestReport>>,
3125 ) {
3126 let (result, drained_chunks) = self.handle_ingest_document_durable(
3130 path,
3131 chunk_config,
3132 audit_principal.clone(),
3133 );
3134 let durable_ok = result.is_ok();
3135 if let Err(ref e) = result {
3138 self.emit_audit_best_effort(
3139 AuditOperation::MemoryIngestDocument,
3140 None,
3141 AuditResult::Error,
3142 audit_principal,
3143 Some(serde_json::json!({ "error": e.to_string() })),
3144 );
3145 }
3146 let _ = reply.send(result);
3147
3148 if durable_ok {
3154 self.emit_invalidate(
3155 AuditOperation::MemoryIngestDocument.as_str(),
3156 "document",
3157 );
3158 }
3159
3160 if durable_ok && !drained_chunks.is_empty() {
3161 for chunk_id in &drained_chunks {
3162 if let Err(e) = self.conn.execute(
3163 "DELETE FROM pending_index WHERE kind = 'chunk' AND chunk_id = ?",
3164 params![chunk_id.to_string()],
3165 ) {
3166 tracing::warn!(
3167 error = %e,
3168 %chunk_id,
3169 "pending_index drain (chunk) failed; will replay on next startup"
3170 );
3171 }
3172 }
3173 }
3174 }
3175
3176 fn handle_ingest_document_durable(
3193 &mut self,
3194 path: std::path::PathBuf,
3195 chunk_config: crate::document::ChunkConfig,
3196 audit_principal: Option<String>,
3197 ) -> (Result<IngestReport>, Vec<solo_core::ChunkId>) {
3198 let file_size: u64 = match std::fs::metadata(&path) {
3214 Ok(meta) => {
3215 let size = meta.len();
3216 if let Some(cap) = resolve_ingest_max_bytes() {
3217 if size > cap {
3218 return (
3219 Err(Error::storage(format!(
3220 "ingest_document: file {} is {size} bytes, exceeds \
3221 SOLO_INGEST_MAX_BYTES cap of {cap} bytes. Set \
3222 SOLO_INGEST_MAX_BYTES=<larger> to override, or \
3223 SOLO_INGEST_MAX_BYTES=0 to disable the cap.",
3224 path.display()
3225 ))),
3226 Vec::new(),
3227 );
3228 }
3229 }
3230 size
3231 }
3232 Err(e) => {
3233 return (
3234 Err(Error::storage(format!(
3235 "ingest_document: stat {}: {e}",
3236 path.display()
3237 ))),
3238 Vec::new(),
3239 );
3240 }
3241 };
3242
3243 let ingest_growth_estimate: u64 = file_size.saturating_mul(2);
3252 match check_quota(
3253 self.quota_bytes,
3254 self.db_path.as_deref(),
3255 ingest_growth_estimate,
3256 ) {
3257 QuotaDecision::Unlimited | QuotaDecision::Allowed { .. } => {}
3258 QuotaDecision::Exceeded {
3259 current_size,
3260 estimated_growth,
3261 quota,
3262 } => {
3263 let err = QuotaExceededError {
3264 current_size,
3265 estimated_growth,
3266 quota,
3267 };
3268 self.emit_audit_best_effort(
3269 AuditOperation::MemoryIngestDocument,
3270 Some(path.display().to_string()),
3271 AuditResult::Forbidden,
3272 audit_principal,
3273 Some(err.to_details_json()),
3274 );
3275 return (Err(Error::forbidden(err.to_string())), Vec::new());
3276 }
3277 }
3278
3279 let parsed = match crate::document::parse_file(&path) {
3281 Ok(p) => p,
3282 Err(e) => {
3283 return (
3284 Err(Error::storage(format!(
3285 "ingest_document: parse {}: {e}",
3286 path.display()
3287 ))),
3288 Vec::new(),
3289 );
3290 }
3291 };
3292 let chunks = crate::document::chunk_text(&parsed.text, &chunk_config);
3293
3294 let content_hash = {
3296 use sha2::{Digest, Sha256};
3297 let mut hasher = Sha256::new();
3298 hasher.update(parsed.text.as_bytes());
3299 hex::encode(hasher.finalize())
3300 };
3301
3302 let existing_doc: Option<String> = match self
3303 .conn
3304 .query_row(
3305 "SELECT doc_id FROM documents WHERE content_hash = ? LIMIT 1",
3306 params![&content_hash],
3307 |r| r.get::<_, String>(0),
3308 )
3309 .optional()
3310 {
3311 Ok(v) => v,
3312 Err(e) => {
3313 return (
3314 Err(Error::storage(format!(
3315 "ingest_document: dedup lookup: {e}"
3316 ))),
3317 Vec::new(),
3318 );
3319 }
3320 };
3321 if let Some(doc_id_s) = existing_doc {
3322 let doc_id = match solo_core::DocumentId::from_str(&doc_id_s) {
3323 Ok(id) => id,
3324 Err(e) => {
3325 return (
3326 Err(Error::storage(format!(
3327 "ingest_document: parse existing doc_id `{doc_id_s}`: {e}"
3328 ))),
3329 Vec::new(),
3330 );
3331 }
3332 };
3333 tracing::info!(
3334 %doc_id,
3335 content_hash = %content_hash,
3336 "ingest_document: dedup hit; returning existing doc_id"
3337 );
3338 self.emit_audit_best_effort(
3343 AuditOperation::MemoryIngestDocument,
3344 Some(doc_id.to_string()),
3345 AuditResult::Ok,
3346 audit_principal.clone(),
3347 Some(serde_json::json!({ "deduped": true })),
3348 );
3349 return (
3350 Ok(IngestReport {
3351 doc_id,
3352 chunks_persisted: 0,
3353 bytes_ingested: parsed.byte_size,
3354 deduped: true,
3355 }),
3356 Vec::new(),
3357 );
3358 }
3359
3360 let embedder = match self.embedder.clone() {
3363 Some(e) => e,
3364 None => {
3365 return (
3366 Err(Error::Other(
3367 "ingest_document: writer has no embedder \
3368 (use spawn_full_with_embedder)"
3369 .into(),
3370 )),
3371 Vec::new(),
3372 );
3373 }
3374 };
3375 let runtime = match self.runtime_handle.clone() {
3376 Some(r) => r,
3377 None => {
3378 return (
3379 Err(Error::Other(
3380 "ingest_document: writer has no runtime handle \
3381 (use spawn_full_with_embedder)"
3382 .into(),
3383 )),
3384 Vec::new(),
3385 );
3386 }
3387 };
3388 let embedder_id = match self.embedder_id {
3389 Some(id) => id,
3390 None => {
3391 return (
3392 Err(Error::Other(
3393 "ingest_document: writer has no embedder_id \
3394 (use spawn_full_with_embedder)"
3395 .into(),
3396 )),
3397 Vec::new(),
3398 );
3399 }
3400 };
3401
3402 if chunks.is_empty() {
3405 return (
3406 Err(Error::storage(
3407 "ingest_document: parser returned text but chunker produced 0 chunks",
3408 )),
3409 Vec::new(),
3410 );
3411 }
3412
3413 let mut redacted_chunks: Vec<crate::document::ChunkSpec> = Vec::with_capacity(chunks.len());
3419 let mut redaction_match_counts: std::collections::HashMap<String, u32> =
3420 std::collections::HashMap::new();
3421 for spec in &chunks {
3422 let result = self.redactor.redact(&spec.content);
3423 for m in &result.matches {
3424 *redaction_match_counts.entry(m.pattern_name.clone()).or_insert(0) += m.count;
3425 }
3426 let new_content = match result.text {
3427 std::borrow::Cow::Borrowed(_) => spec.content.clone(),
3428 std::borrow::Cow::Owned(s) => s,
3429 };
3430 redacted_chunks.push(crate::document::ChunkSpec {
3431 content: new_content,
3432 token_count: spec.token_count,
3433 start_offset: spec.start_offset,
3434 end_offset: spec.end_offset,
3435 });
3436 }
3437 let chunks = redacted_chunks;
3438
3439 let texts: Vec<&str> = chunks.iter().map(|c| c.content.as_str()).collect();
3445 let embeddings = match runtime.block_on(embedder.embed_batch(&texts)) {
3446 Ok(v) => v,
3447 Err(e) => {
3448 return (
3449 Err(Error::storage(format!(
3450 "ingest_document: embed_batch failed: {e}"
3451 ))),
3452 Vec::new(),
3453 );
3454 }
3455 };
3456 if embeddings.len() != chunks.len() {
3457 return (
3458 Err(Error::storage(format!(
3459 "ingest_document: embed_batch returned {} embeddings for {} chunks",
3460 embeddings.len(),
3461 chunks.len()
3462 ))),
3463 Vec::new(),
3464 );
3465 }
3466 for (i, emb) in embeddings.iter().enumerate() {
3468 if let Err(e) = emb.validate() {
3469 return (
3470 Err(Error::storage(format!(
3471 "ingest_document: chunk {i} embedding invalid: {e}"
3472 ))),
3473 Vec::new(),
3474 );
3475 }
3476 }
3477
3478 let dtype_str = match embedder.dtype() {
3480 solo_core::EmbeddingDtype::F32 => "f32",
3481 solo_core::EmbeddingDtype::F16 => "f16",
3482 solo_core::EmbeddingDtype::I8 => "i8",
3483 solo_core::EmbeddingDtype::Binary => "binary",
3484 };
3485
3486 let doc_id = solo_core::DocumentId::new();
3488 let now_ms = chrono::Utc::now().timestamp_millis();
3489 let modified_at_ms: Option<i64> = std::fs::metadata(&path)
3490 .ok()
3491 .and_then(|m| m.modified().ok())
3492 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
3493 .map(|d| d.as_millis() as i64);
3494
3495 let title: String = derive_document_title(&parsed.text, &path);
3499 let source: String = path.to_string_lossy().to_string();
3500
3501 let chunk_count = chunks.len() as u32;
3502
3503 let mut chunk_records: Vec<(solo_core::ChunkId, i64, solo_core::Embedding)> =
3508 Vec::with_capacity(chunks.len());
3509
3510 let tx = match self
3511 .conn
3512 .transaction_with_behavior(TransactionBehavior::Immediate)
3513 {
3514 Ok(t) => t,
3515 Err(e) => {
3516 return (
3517 Err(Error::storage(format!(
3518 "ingest_document: BEGIN IMMEDIATE: {e}"
3519 ))),
3520 Vec::new(),
3521 );
3522 }
3523 };
3524
3525 if let Err(e) = tx.execute(
3527 "INSERT INTO documents (
3528 doc_id, source, title, mime_type,
3529 ingested_at_ms, modified_at_ms, status,
3530 chunk_count, content_hash, byte_size
3531 ) VALUES (?, ?, ?, ?, ?, ?, 'active', ?, ?, ?)",
3532 params![
3533 doc_id.to_string(),
3534 source,
3535 title,
3536 parsed.mime_type,
3537 now_ms,
3538 modified_at_ms,
3539 chunk_count as i64,
3540 content_hash,
3541 parsed.byte_size as i64,
3542 ],
3543 ) {
3544 return (
3545 Err(Error::storage(format!(
3546 "ingest_document: INSERT documents: {e}"
3547 ))),
3548 Vec::new(),
3549 );
3550 }
3551
3552 for (idx, (spec, embedding)) in chunks.iter().zip(embeddings.iter()).enumerate() {
3554 let chunk_id = solo_core::ChunkId::new();
3555 if let Err(e) = tx.execute(
3556 "INSERT INTO document_chunks (
3557 chunk_id, doc_id, chunk_index, content,
3558 token_count, start_offset, end_offset, created_at_ms,
3559 ingested_by_principal
3560 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
3561 params![
3562 chunk_id.to_string(),
3563 doc_id.to_string(),
3564 idx as i64,
3565 spec.content,
3566 spec.token_count as i64,
3567 spec.start_offset as i64,
3568 spec.end_offset as i64,
3569 now_ms,
3570 audit_principal.as_deref(),
3571 ],
3572 ) {
3573 return (
3574 Err(Error::storage(format!(
3575 "ingest_document: INSERT document_chunks (idx {idx}): {e}"
3576 ))),
3577 Vec::new(),
3578 );
3579 }
3580 let rowid = tx.last_insert_rowid();
3581
3582 if let Err(e) = tx.execute(
3583 "INSERT INTO chunk_embeddings (
3584 chunk_id, embedder_id, dtype, dim, vector, created_at_ms
3585 ) VALUES (?, ?, ?, ?, ?, ?)",
3586 params![
3587 chunk_id.to_string(),
3588 embedder_id,
3589 dtype_str,
3590 embedding.dim as i64,
3591 &embedding.data[..],
3592 now_ms,
3593 ],
3594 ) {
3595 return (
3596 Err(Error::storage(format!(
3597 "ingest_document: INSERT chunk_embeddings (idx {idx}): {e}"
3598 ))),
3599 Vec::new(),
3600 );
3601 }
3602
3603 if let Err(e) = tx.execute(
3604 "INSERT INTO pending_index (
3605 kind, chunk_id, embedding, embedding_dim, enqueued_at
3606 ) VALUES ('chunk', ?, ?, ?, ?)",
3607 params![
3608 chunk_id.to_string(),
3609 &embedding.data[..],
3610 embedding.dim as i64,
3611 now_ms,
3612 ],
3613 ) {
3614 return (
3615 Err(Error::storage(format!(
3616 "ingest_document: INSERT pending_index (idx {idx}): {e}"
3617 ))),
3618 Vec::new(),
3619 );
3620 }
3621
3622 chunk_records.push((chunk_id, rowid, embedding.clone()));
3623 }
3624
3625 if !redaction_match_counts.is_empty() {
3628 let aggregated: Vec<crate::redaction::RedactionMatch> = redaction_match_counts
3629 .iter()
3630 .map(|(name, count)| crate::redaction::RedactionMatch {
3631 pattern_name: name.clone(),
3632 count: *count,
3633 })
3634 .collect();
3635 if let Err(e) = insert_audit_row_in_tx(
3636 &tx,
3637 &redaction_audit_event(
3638 now_ms,
3639 audit_principal.clone(),
3640 Some(doc_id.to_string()),
3641 &aggregated,
3642 ),
3643 ) {
3644 return (Err(e), Vec::new());
3645 }
3646 }
3647
3648 if let Err(e) = insert_audit_row_in_tx(
3652 &tx,
3653 &AuditEvent {
3654 ts_ms: now_ms,
3655 principal_subject: audit_principal.clone(),
3656 operation: AuditOperation::MemoryIngestDocument,
3657 target_id: Some(doc_id.to_string()),
3658 result: AuditResult::Ok,
3659 details: Some(serde_json::json!({
3660 "chunks_persisted": chunk_count,
3661 "bytes_ingested": parsed.byte_size,
3662 })),
3663 },
3664 ) {
3665 return (Err(e), Vec::new());
3666 }
3667
3668 if let Err(e) = tx.commit() {
3670 return (
3671 Err(Error::storage(format!(
3672 "ingest_document: COMMIT: {e}"
3673 ))),
3674 Vec::new(),
3675 );
3676 }
3677
3678 let mut drained: Vec<solo_core::ChunkId> = Vec::with_capacity(chunk_records.len());
3684 for (chunk_id, rowid, embedding) in &chunk_records {
3685 let f32_slice = match embedding.as_f32_slice() {
3686 Some(s) => s,
3687 None => {
3688 tracing::warn!(
3689 %chunk_id,
3690 "ingest_document: chunk embedding is not F32; HNSW add skipped \
3691 (pending_index row will be replayed)"
3692 );
3693 continue;
3694 }
3695 };
3696 match self.hnsw.add(chunk_hnsw_id(*rowid), f32_slice) {
3700 Ok(_) => drained.push(*chunk_id),
3701 Err(e) => {
3702 tracing::warn!(
3703 %chunk_id,
3704 error = %e,
3705 "ingest_document: hnsw.add failed; pending_index row left for replay"
3706 );
3707 }
3708 }
3709 }
3710
3711 tracing::info!(
3712 %doc_id,
3713 chunks = chunk_records.len(),
3714 indexed = drained.len(),
3715 bytes = parsed.byte_size,
3716 "ingest_document complete"
3717 );
3718
3719 (
3720 Ok(IngestReport {
3721 doc_id,
3722 chunks_persisted: chunk_count,
3723 bytes_ingested: parsed.byte_size,
3724 deduped: false,
3725 }),
3726 drained,
3727 )
3728 }
3729
3730 fn handle_forget_document(
3748 &mut self,
3749 doc_id: solo_core::DocumentId,
3750 audit_principal: Option<String>,
3751 ) -> Result<ForgetDocumentReport> {
3752 let id_str = doc_id.to_string();
3753
3754 let exists: Option<String> = self
3756 .conn
3757 .query_row(
3758 "SELECT status FROM documents WHERE doc_id = ?",
3759 params![&id_str],
3760 |r| r.get::<_, String>(0),
3761 )
3762 .optional()
3763 .map_err(|e| {
3764 Error::storage(format!("forget_document: lookup status: {e}"))
3765 })?;
3766 let Some(prior_status) = exists else {
3767 return Err(Error::not_found(format!(
3768 "doc_id {doc_id} not found in documents"
3769 )));
3770 };
3771
3772 let tx = self
3774 .conn
3775 .transaction_with_behavior(TransactionBehavior::Immediate)
3776 .map_err(|e| {
3777 Error::storage(format!("forget_document: BEGIN IMMEDIATE: {e}"))
3778 })?;
3779
3780 tx.execute(
3783 "UPDATE documents
3784 SET status = 'forgotten'
3785 WHERE doc_id = ? AND status <> 'forgotten'",
3786 params![&id_str],
3787 )
3788 .map_err(|e| {
3789 Error::storage(format!("forget_document: UPDATE status: {e}"))
3790 })?;
3791
3792 insert_audit_row_in_tx(
3794 &tx,
3795 &AuditEvent {
3796 ts_ms: chrono::Utc::now().timestamp_millis(),
3797 principal_subject: audit_principal,
3798 operation: AuditOperation::MemoryForgetDocument,
3799 target_id: Some(id_str.clone()),
3800 result: AuditResult::Ok,
3801 details: None,
3802 },
3803 )?;
3804
3805 tx.commit().map_err(|e| {
3806 Error::storage(format!("forget_document: COMMIT: {e}"))
3807 })?;
3808
3809 let mut stmt = self
3811 .conn
3812 .prepare("SELECT rowid FROM document_chunks WHERE doc_id = ?")
3813 .map_err(|e| {
3814 Error::storage(format!(
3815 "forget_document: prepare chunk-rowid select: {e}"
3816 ))
3817 })?;
3818 let rowids: Vec<i64> = stmt
3829 .query_map(params![&id_str], |r| r.get::<_, i64>(0))
3830 .map_err(|e| {
3831 Error::storage(format!(
3832 "forget_document: query chunk rowids: {e}"
3833 ))
3834 })?
3835 .collect::<std::result::Result<Vec<i64>, rusqlite::Error>>()
3836 .map_err(|e| {
3837 Error::storage(format!(
3838 "forget_document: decode chunk rowid row: {e}"
3839 ))
3840 })?;
3841
3842 let chunks_tombstoned = rowids.len() as u32;
3843 for rowid in rowids {
3844 if let Err(e) = self.hnsw.remove(chunk_hnsw_id(rowid)) {
3847 tracing::warn!(
3848 rowid,
3849 %doc_id,
3850 error = %e,
3851 "forget_document: hnsw.remove failed (non-fatal; SQL filter still hides chunk)"
3852 );
3853 }
3854 }
3855
3856 if prior_status == "forgotten" {
3857 tracing::debug!(
3858 %doc_id,
3859 "forget_document called on already-forgotten doc (idempotent)"
3860 );
3861 } else {
3862 tracing::info!(
3863 %doc_id,
3864 chunks_tombstoned,
3865 "document soft-deleted (status=forgotten)"
3866 );
3867 }
3868 Ok(ForgetDocumentReport {
3869 doc_id,
3870 chunks_tombstoned,
3871 })
3872 }
3873
3874 fn handle_consolidate(
3875 &mut self,
3876 scope: ConsolidationScope,
3877 audit_principal: Option<String>,
3878 ) -> Result<ConsolidationReport> {
3879 if let Some(embedder) = self.embedder.as_ref() {
3891 if embedder.name() == crate::embedder::STUB_EMBEDDER_NAME {
3892 if std::env::var_os("SOLO_REFUSE_STUB_EMBEDDER").is_some() {
3893 return Err(Error::invalid_input(
3894 "consolidation refused: StubEmbedder produces \
3895 non-semantic vectors. Set SOLO_EMBEDDER=bundled \
3896 or =ollama, or unset SOLO_REFUSE_STUB_EMBEDDER to \
3897 downgrade this to a warning."
3898 .to_string(),
3899 ));
3900 }
3901 tracing::error!(
3902 "consolidation running with StubEmbedder — cluster \
3903 membership is BLAKE3-hash proximity, not semantic. \
3904 Configure SOLO_EMBEDDER=bundled or =ollama for real \
3905 vectors. Set SOLO_REFUSE_STUB_EMBEDDER=1 to make this \
3906 a hard error."
3907 );
3908 }
3909 }
3910 let result = self.handle_consolidate_impl(scope);
3920 match &result {
3926 Ok(report) => self.emit_audit_best_effort(
3927 AuditOperation::MemoryConsolidate,
3928 None,
3929 AuditResult::Ok,
3930 audit_principal,
3931 Some(serde_json::json!({
3932 "episodes_seen": report.episodes_seen,
3933 "clusters_built": report.clusters_built,
3934 "abstractions_built": report.abstractions_built,
3935 "triples_built": report.triples_built,
3936 "contradictions_found": report.contradictions_found,
3937 })),
3938 ),
3939 Err(e) => self.emit_audit_best_effort(
3940 AuditOperation::MemoryConsolidate,
3941 None,
3942 AuditResult::Error,
3943 audit_principal,
3944 Some(serde_json::json!({ "error": e.to_string() })),
3945 ),
3946 }
3947 result
3948 }
3949
3950 fn handle_consolidate_impl(
3954 &mut self,
3955 scope: ConsolidationScope,
3956 ) -> Result<ConsolidationReport> {
3957 let current_id = self.embedder_id.ok_or_else(|| {
3958 Error::Other(
3959 "consolidate: writer has no current embedder_id (use spawn_full_with_embedder)"
3960 .into(),
3961 )
3962 })?;
3963
3964 let active_steward: Option<Arc<solo_steward::Steward>> =
3970 self.current_steward();
3971
3972 let now_ms = chrono::Utc::now().timestamp_millis();
3975 let cutoff_ms: Option<i64> = scope.window_days.and_then(|days| {
3976 const MS_PER_DAY: i64 = 86_400_000;
3977 days.checked_mul(MS_PER_DAY).map(|w| now_ms - w)
3978 });
3979
3980 let candidates: Vec<(Episode, Embedding)> = {
3999 let (sql, params): (&str, Vec<rusqlite::types::Value>) = match cutoff_ms {
4000 Some(cutoff) => (
4001 "SELECT e.memory_id, e.ts_ms, e.source_type, e.content,
4002 e.confidence, e.strength, e.salience,
4003 em.dtype, em.dim, em.vector
4004 FROM episodes e
4005 JOIN embeddings em ON em.memory_id = e.memory_id
4006 WHERE em.embedder_id = ?1
4007 AND e.status = 'active'
4008 AND e.tier = 'hot'
4009 AND e.ts_ms >= ?2
4010 AND e.memory_id NOT IN (SELECT memory_id FROM cluster_episodes)
4011 ORDER BY e.ts_ms, e.rowid",
4012 vec![current_id.into(), cutoff.into()],
4013 ),
4014 None => (
4015 "SELECT e.memory_id, e.ts_ms, e.source_type, e.content,
4016 e.confidence, e.strength, e.salience,
4017 em.dtype, em.dim, em.vector
4018 FROM episodes e
4019 JOIN embeddings em ON em.memory_id = e.memory_id
4020 WHERE em.embedder_id = ?1
4021 AND e.status = 'active'
4022 AND e.tier = 'hot'
4023 AND e.memory_id NOT IN (SELECT memory_id FROM cluster_episodes)
4024 ORDER BY e.ts_ms, e.rowid",
4025 vec![current_id.into()],
4026 ),
4027 };
4028
4029 let mut stmt = self
4030 .conn
4031 .prepare(sql)
4032 .map_err(|e| Error::storage(format!("prepare consolidate select: {e}")))?;
4033 let rows = stmt
4034 .query_map(params_from_iter(¶ms), |r| {
4035 let memory_id: String = r.get(0)?;
4036 let ts_ms: i64 = r.get(1)?;
4037 let source_type: String = r.get(2)?;
4038 let content: String = r.get(3)?;
4039 let confidence_f: f32 = r.get(4)?;
4040 let strength: f32 = r.get(5)?;
4041 let salience: f32 = r.get(6)?;
4042 let dtype_str: String = r.get(7)?;
4043 let dim: i64 = r.get(8)?;
4044 let vector: Vec<u8> = r.get(9)?;
4045 Ok((
4046 memory_id,
4047 ts_ms,
4048 source_type,
4049 content,
4050 confidence_f,
4051 strength,
4052 salience,
4053 dtype_str,
4054 dim,
4055 vector,
4056 ))
4057 })
4058 .map_err(|e| Error::storage(format!("query_map consolidate: {e}")))?;
4059
4060 let mut out = Vec::new();
4061 for row in rows {
4062 let (memory_id, ts_ms, source_type, content, conf, strength, salience, dtype_str, dim, vector) =
4063 row.map_err(|e| {
4064 Error::storage(format!("consolidate row decode: {e}"))
4065 })?;
4066 let mid = MemoryId::from_str(&memory_id).map_err(|e| {
4067 Error::storage(format!("parse memory_id `{memory_id}`: {e}"))
4068 })?;
4069 let dtype = match dtype_str.as_str() {
4070 "f32" => solo_core::EmbeddingDtype::F32,
4071 "f16" => solo_core::EmbeddingDtype::F16,
4072 "i8" => solo_core::EmbeddingDtype::I8,
4073 "binary" => solo_core::EmbeddingDtype::Binary,
4074 other => {
4075 return Err(Error::storage(format!(
4076 "unknown embeddings.dtype value `{other}`"
4077 )));
4078 }
4079 };
4080 let embedding = Embedding {
4081 dtype,
4082 dim: dim as usize,
4083 data: vector,
4084 };
4085 let confidence = solo_core::Confidence::new(conf).map_err(|e| {
4086 Error::storage(format!("invalid confidence in episodes row: {e}"))
4087 })?;
4088 let episode = Episode {
4089 memory_id: mid,
4090 ts_ms,
4091 source_type,
4092 source_id: None, content,
4094 encoding_context: solo_core::EncodingContext::default(),
4095 provenance: None,
4096 confidence,
4097 strength,
4098 salience,
4099 tier: Tier::Hot,
4100 };
4101 out.push((episode, embedding));
4102 }
4103 out
4104 };
4105
4106 let mut report = ConsolidationReport {
4107 episodes_seen: candidates.len(),
4108 ..ConsolidationReport::default()
4109 };
4110
4111 if candidates.is_empty() && !scope.force_merge {
4112 tracing::info!(seen = 0, "consolidate: no candidates");
4113 return Ok(report);
4114 }
4115 if candidates.is_empty() {
4116 tracing::info!(
4117 seen = 0,
4118 "consolidate: no candidates, but force_merge set; falling through to merge + regen"
4119 );
4120 }
4121
4122 let config = match active_steward.as_ref() {
4142 Some(s) => s.config().clone(),
4143 None => solo_steward::StewardConfig::from_env()?,
4144 };
4145 let mut clusters = solo_steward::cluster::cluster_episodes(&candidates, &config)?;
4146
4147 let absorbed =
4157 solo_steward::cluster::merge_clusters_by_centroid(&mut clusters, &config)?;
4158 report.clusters_merged = absorbed;
4159 if absorbed > 0 {
4160 tracing::info!(
4161 absorbed,
4162 survivors = clusters.len(),
4163 "consolidate: centroid merge collapsed cross-bucket clusters"
4164 );
4165 }
4166
4167 report.clusters_built = clusters.len();
4168 report.episodes_clustered = clusters.iter().map(|c| c.episode_ids.len()).sum();
4169
4170 if clusters.is_empty() {
4171 tracing::info!(
4172 seen = report.episodes_seen,
4173 "consolidate: no new clusters formed; falling through to merge + regen"
4174 );
4175 }
4181
4182 let expected_dim = if let Some(c) = candidates.first() {
4208 c.1.dim
4209 } else {
4210 self.conn
4211 .query_row(
4212 "SELECT dim FROM embedders WHERE embedder_id = ?",
4213 params![current_id],
4214 |r| r.get::<_, i64>(0),
4215 )
4216 .map(|d| d as usize)
4217 .map_err(|e| {
4218 Error::storage(format!(
4219 "consolidate force_merge: lookup dim for embedder_id {current_id}: {e}"
4220 ))
4221 })?
4222 };
4223 let existing_summaries =
4224 self.fetch_existing_cluster_summaries(cutoff_ms, expected_dim)?;
4225 let absorb_plan = if existing_summaries.is_empty() {
4226 solo_steward::cluster::AbsorbPlan::default()
4227 } else {
4228 solo_steward::cluster::absorb_into_existing(
4229 &clusters,
4230 &existing_summaries,
4231 &config,
4232 )?
4233 };
4234 report.clusters_absorbed = absorb_plan.absorptions.len();
4235 if !absorb_plan.absorptions.is_empty() {
4236 tracing::info!(
4237 absorbed = absorb_plan.absorptions.len(),
4238 existing_modified = absorb_plan.modified_existing_ids().len(),
4239 "consolidate: cross-run absorb folded clusters into existing"
4240 );
4241 }
4242
4243 let absorbed_by_new: std::collections::HashMap<
4246 MemoryId,
4247 &solo_steward::cluster::AbsorbedCluster,
4248 > = absorb_plan
4249 .absorptions
4250 .iter()
4251 .map(|a| (a.new_cluster_id, a))
4252 .collect();
4253
4254 report.clusters_built = clusters.len() - absorb_plan.absorptions.len();
4258
4259 let txn = self
4263 .conn
4264 .transaction_with_behavior(TransactionBehavior::Immediate)
4265 .map_err(|e| Error::storage(format!("BEGIN consolidate: {e}")))?;
4266
4267 for cluster in &clusters {
4268 if let Some(absorbed) = absorbed_by_new.get(&cluster.cluster_id) {
4275 let target_id_s = absorbed.existing_cluster_id.to_string();
4276 for memid in &cluster.episode_ids {
4277 txn.execute(
4278 "INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
4279 params![target_id_s, memid.to_string()],
4280 )
4281 .map_err(|e| {
4282 Error::storage(format!("INSERT cluster_episodes (absorbed): {e}"))
4283 })?;
4284 }
4285 continue;
4286 }
4287
4288 let centroid_dtype: Option<&'static str> = cluster.centroid.as_ref().map(|e| {
4289 match e.dtype {
4290 solo_core::EmbeddingDtype::F32 => "f32",
4291 solo_core::EmbeddingDtype::F16 => "f16",
4292 solo_core::EmbeddingDtype::I8 => "i8",
4293 solo_core::EmbeddingDtype::Binary => "binary",
4294 }
4295 });
4296 let centroid_dim: Option<i64> =
4297 cluster.centroid.as_ref().map(|e| e.dim as i64);
4298 let centroid_blob: Option<&[u8]> =
4299 cluster.centroid.as_ref().map(|e| e.data.as_slice());
4300
4301 txn.execute(
4302 "INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms)
4303 VALUES (?, ?, ?, ?, ?, ?)",
4304 params![
4305 cluster.cluster_id.to_string(),
4306 centroid_blob,
4307 centroid_dtype,
4308 centroid_dim,
4309 cluster.coherence as f64,
4310 now_ms,
4311 ],
4312 )
4313 .map_err(|e| Error::storage(format!("INSERT cluster: {e}")))?;
4314
4315 for memid in &cluster.episode_ids {
4316 txn.execute(
4317 "INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
4318 params![cluster.cluster_id.to_string(), memid.to_string()],
4319 )
4320 .map_err(|e| Error::storage(format!("INSERT cluster_episodes: {e}")))?;
4321 }
4322 }
4323
4324 let mut modified_existing: Vec<&solo_steward::cluster::AbsorbedCluster> =
4329 absorb_plan.absorptions.iter().collect();
4330 modified_existing.sort_by(|a, b| a.existing_cluster_id.cmp(&b.existing_cluster_id));
4331 let mut last_per_existing: std::collections::HashMap<
4336 MemoryId,
4337 &solo_steward::cluster::AbsorbedCluster,
4338 > = std::collections::HashMap::new();
4339 for a in &absorb_plan.absorptions {
4340 last_per_existing.insert(a.existing_cluster_id, a);
4341 }
4342 let mut existing_ids_sorted: Vec<MemoryId> =
4343 last_per_existing.keys().copied().collect();
4344 existing_ids_sorted.sort();
4345 for existing_id in existing_ids_sorted {
4346 let absorbed = last_per_existing[&existing_id];
4347 txn.execute(
4348 "UPDATE clusters
4349 SET centroid = ?, centroid_dtype = ?, centroid_dim = ?, coherence = ?
4350 WHERE cluster_id = ?",
4351 params![
4352 absorbed.merged_centroid.data.as_slice(),
4353 "f32",
4354 absorbed.merged_centroid.dim as i64,
4355 absorbed.merged_coherence as f64,
4356 existing_id.to_string(),
4357 ],
4358 )
4359 .map_err(|e| Error::storage(format!("UPDATE existing cluster centroid: {e}")))?;
4360 txn.execute(
4369 "DELETE FROM semantic_abstractions WHERE cluster_id = ?",
4370 params![existing_id.to_string()],
4371 )
4372 .map_err(|e| {
4373 Error::storage(format!(
4374 "DELETE stale abstraction on absorb (cluster {existing_id}): {e}"
4375 ))
4376 })?;
4377 txn.execute(
4378 "DELETE FROM triples WHERE cluster_id = ?",
4379 params![existing_id.to_string()],
4380 )
4381 .map_err(|e| {
4382 Error::storage(format!(
4383 "DELETE stale triples on absorb (cluster {existing_id}): {e}"
4384 ))
4385 })?;
4386 }
4387
4388 txn.commit()
4389 .map_err(|e| Error::storage(format!("COMMIT consolidate: {e}")))?;
4390
4391 let merge_plan: solo_steward::cluster::MergePlan =
4420 if active_steward.is_some() {
4421 let existing_full =
4422 self.fetch_existing_clusters_full(cutoff_ms, expected_dim)?;
4423 if existing_full.len() < 2 {
4424 solo_steward::cluster::MergePlan::default()
4425 } else {
4426 solo_steward::cluster::plan_existing_merges(
4427 &existing_full,
4428 &config,
4429 )?
4430 }
4431 } else {
4432 solo_steward::cluster::MergePlan::default()
4433 };
4434 report.existing_clusters_merged = merge_plan.absorbed();
4435
4436 if !merge_plan.merges.is_empty() {
4437 tracing::info!(
4438 merges = merge_plan.merges.len(),
4439 absorbed = merge_plan.absorbed(),
4440 "consolidate: existing-vs-existing merge applied"
4441 );
4442 let merge_txn = self
4443 .conn
4444 .transaction_with_behavior(TransactionBehavior::Immediate)
4445 .map_err(|e| {
4446 Error::storage(format!("BEGIN existing-merge txn: {e}"))
4447 })?;
4448 for op in &merge_plan.merges {
4449 let survivor_str = op.survivor_id.to_string();
4450 for loser_id in &op.loser_ids {
4452 merge_txn
4453 .execute(
4454 "UPDATE cluster_episodes
4455 SET cluster_id = ?1
4456 WHERE cluster_id = ?2",
4457 params![survivor_str, loser_id.to_string()],
4458 )
4459 .map_err(|e| {
4460 Error::storage(format!(
4461 "UPDATE cluster_episodes (existing-merge): {e}"
4462 ))
4463 })?;
4464 }
4465 merge_txn
4467 .execute(
4468 "UPDATE clusters
4469 SET centroid = ?, centroid_dtype = ?, centroid_dim = ?, coherence = ?
4470 WHERE cluster_id = ?",
4471 params![
4472 op.merged_centroid.data.as_slice(),
4473 "f32",
4474 op.merged_centroid.dim as i64,
4475 op.merged_coherence as f64,
4476 survivor_str,
4477 ],
4478 )
4479 .map_err(|e| {
4480 Error::storage(format!(
4481 "UPDATE clusters (existing-merge): {e}"
4482 ))
4483 })?;
4484 for loser_id in &op.loser_ids {
4488 merge_txn
4489 .execute(
4490 "DELETE FROM clusters WHERE cluster_id = ?",
4491 params![loser_id.to_string()],
4492 )
4493 .map_err(|e| {
4494 Error::storage(format!(
4495 "DELETE clusters (existing-merge): {e}"
4496 ))
4497 })?;
4498 }
4499 }
4500 merge_txn
4501 .commit()
4502 .map_err(|e| Error::storage(format!("COMMIT existing-merge: {e}")))?;
4503 }
4504
4505 let _writer_actor_no_longer_does_llm_inline =
4554 (active_steward.is_some(), self.runtime_handle.is_some());
4555
4556 tracing::info!(
4557 seen = report.episodes_seen,
4558 clusters = report.clusters_built,
4559 episodes_clustered = report.episodes_clustered,
4560 abstractions = report.abstractions_built,
4561 triples = report.triples_built,
4562 contradictions = report.contradictions_found,
4563 "consolidate complete"
4564 );
4565 Ok(report)
4566 }
4567
4568 fn fetch_triples_for_pair(
4580 &self,
4581 subject_id: &str,
4582 predicate: &str,
4583 exclude: &std::collections::HashSet<MemoryId>,
4584 ) -> Result<Vec<solo_core::Triple>> {
4585 let mut stmt = self
4586 .conn
4587 .prepare(
4588 "SELECT triple_id, object_id, object_kind, valid_from_ms, valid_to_ms,
4589 confidence, provenance_json
4590 FROM triples
4591 WHERE subject_id = ?1 AND predicate = ?2
4592 AND status = 'active'",
4593 )
4594 .map_err(|e| Error::storage(format!("prepare fetch_triples_for_pair: {e}")))?;
4595 let rows = stmt
4596 .query_map(params![subject_id, predicate], |r| {
4597 Ok((
4598 r.get::<_, String>(0)?,
4599 r.get::<_, String>(1)?,
4600 r.get::<_, String>(2)?,
4601 r.get::<_, i64>(3)?,
4602 r.get::<_, Option<i64>>(4)?,
4603 r.get::<_, f32>(5)?,
4604 r.get::<_, String>(6)?,
4605 ))
4606 })
4607 .map_err(|e| Error::storage(format!("query_map triples: {e}")))?;
4608
4609 let mut out = Vec::new();
4610 for row in rows {
4611 let (triple_id_s, object_id, object_kind_s, valid_from_ms, valid_to_ms, conf, prov_s) =
4612 row.map_err(|e| Error::storage(format!("triples row decode: {e}")))?;
4613 let triple_id = MemoryId::from_str(&triple_id_s).map_err(|e| {
4614 Error::storage(format!("parse triple_id `{triple_id_s}`: {e}"))
4615 })?;
4616 if exclude.contains(&triple_id) {
4617 continue;
4618 }
4619 let object_kind = match object_kind_s.as_str() {
4620 "entity" => solo_core::TripleObjectKind::Entity,
4621 "literal" => solo_core::TripleObjectKind::Literal,
4622 other => {
4623 return Err(Error::storage(format!(
4624 "unknown object_kind value `{other}` in triples row"
4625 )));
4626 }
4627 };
4628 let confidence = solo_core::Confidence::new(conf).map_err(|e| {
4629 Error::storage(format!("invalid confidence in triples row: {e}"))
4630 })?;
4631 let provenance: solo_core::Provenance = serde_json::from_str(&prov_s)
4632 .unwrap_or_else(|_| solo_core::Provenance {
4633 derived_from: vec![],
4634 derivation: "(unparseable)".into(),
4635 by: "(unknown)".into(),
4636 at_ms: 0,
4637 });
4638 out.push(solo_core::Triple {
4639 triple_id,
4640 subject_id: subject_id.to_string(),
4641 predicate: predicate.to_string(),
4642 object_id,
4643 object_kind,
4644 valid_from_ms,
4645 valid_to_ms,
4646 confidence,
4647 provenance,
4648 });
4649 }
4650 Ok(out)
4651 }
4652
4653 fn fetch_existing_cluster_summaries(
4675 &self,
4676 cutoff_ms: Option<i64>,
4677 expected_dim: usize,
4678 ) -> Result<Vec<solo_steward::cluster::ExistingClusterSummary>> {
4679 let (sql, params): (&str, Vec<rusqlite::types::Value>) = match cutoff_ms {
4680 Some(cutoff) => (
4681 "SELECT c.cluster_id, c.centroid, c.centroid_dtype, c.centroid_dim,
4682 c.coherence,
4683 (SELECT COUNT(*) FROM cluster_episodes ce
4684 WHERE ce.cluster_id = c.cluster_id) AS episode_count
4685 FROM clusters c
4686 WHERE c.centroid IS NOT NULL
4687 AND c.centroid_dtype = 'f32'
4688 AND c.centroid_dim = ?1
4689 AND c.created_at_ms >= ?2
4690 ORDER BY c.cluster_id",
4691 vec![(expected_dim as i64).into(), cutoff.into()],
4692 ),
4693 None => (
4694 "SELECT c.cluster_id, c.centroid, c.centroid_dtype, c.centroid_dim,
4695 c.coherence,
4696 (SELECT COUNT(*) FROM cluster_episodes ce
4697 WHERE ce.cluster_id = c.cluster_id) AS episode_count
4698 FROM clusters c
4699 WHERE c.centroid IS NOT NULL
4700 AND c.centroid_dtype = 'f32'
4701 AND c.centroid_dim = ?1
4702 ORDER BY c.cluster_id",
4703 vec![(expected_dim as i64).into()],
4704 ),
4705 };
4706
4707 let mut stmt = self
4708 .conn
4709 .prepare(sql)
4710 .map_err(|e| Error::storage(format!("prepare existing-cluster summaries: {e}")))?;
4711 let rows = stmt
4712 .query_map(params_from_iter(¶ms), |r| {
4713 Ok((
4714 r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?, r.get::<_, String>(2)?, r.get::<_, i64>(3)?, r.get::<_, f32>(4)?, r.get::<_, i64>(5)?, ))
4721 })
4722 .map_err(|e| Error::storage(format!("query_map existing clusters: {e}")))?;
4723
4724 let mut out: Vec<solo_steward::cluster::ExistingClusterSummary> = Vec::new();
4725 for row in rows {
4726 let (cid_s, centroid_bytes, dtype_s, dim_i, coherence, count_i) =
4727 row.map_err(|e| Error::storage(format!("cluster row decode: {e}")))?;
4728 if dtype_s != "f32" || (dim_i as usize) != expected_dim {
4731 continue;
4732 }
4733 let cluster_id = match MemoryId::from_str(&cid_s) {
4734 Ok(id) => id,
4735 Err(e) => {
4736 tracing::warn!(
4737 cluster_id = %cid_s,
4738 error = %e,
4739 "skipping cluster with unparseable cluster_id"
4740 );
4741 continue;
4742 }
4743 };
4744 let centroid = solo_core::Embedding {
4745 dtype: solo_core::EmbeddingDtype::F32,
4746 dim: dim_i as usize,
4747 data: centroid_bytes,
4748 };
4749 let episode_count = count_i.max(0) as usize;
4753 if episode_count == 0 {
4757 continue;
4758 }
4759 out.push(solo_steward::cluster::ExistingClusterSummary {
4760 cluster_id,
4761 centroid,
4762 coherence,
4763 episode_count,
4764 });
4765 }
4766 Ok(out)
4767 }
4768
4769 fn fetch_existing_clusters_full(
4781 &self,
4782 cutoff_ms: Option<i64>,
4783 expected_dim: usize,
4784 ) -> Result<Vec<solo_core::Cluster>> {
4785 let (sql, params): (&str, Vec<rusqlite::types::Value>) = match cutoff_ms {
4786 Some(cutoff) => (
4787 "SELECT c.cluster_id, c.centroid, c.centroid_dtype, c.centroid_dim,
4788 c.coherence, ce.memory_id
4789 FROM clusters c
4790 JOIN cluster_episodes ce ON ce.cluster_id = c.cluster_id
4791 WHERE c.centroid IS NOT NULL
4792 AND c.centroid_dtype = 'f32'
4793 AND c.centroid_dim = ?1
4794 AND c.created_at_ms >= ?2
4795 ORDER BY c.cluster_id, ce.memory_id",
4796 vec![(expected_dim as i64).into(), cutoff.into()],
4797 ),
4798 None => (
4799 "SELECT c.cluster_id, c.centroid, c.centroid_dtype, c.centroid_dim,
4800 c.coherence, ce.memory_id
4801 FROM clusters c
4802 JOIN cluster_episodes ce ON ce.cluster_id = c.cluster_id
4803 WHERE c.centroid IS NOT NULL
4804 AND c.centroid_dtype = 'f32'
4805 AND c.centroid_dim = ?1
4806 ORDER BY c.cluster_id, ce.memory_id",
4807 vec![(expected_dim as i64).into()],
4808 ),
4809 };
4810
4811 let mut stmt = self
4812 .conn
4813 .prepare(sql)
4814 .map_err(|e| Error::storage(format!("prepare existing clusters full: {e}")))?;
4815 let rows = stmt
4816 .query_map(params_from_iter(¶ms), |r| {
4817 Ok((
4818 r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?, r.get::<_, String>(2)?, r.get::<_, i64>(3)?, r.get::<_, f32>(4)?, r.get::<_, String>(5)?, ))
4825 })
4826 .map_err(|e| Error::storage(format!("query_map clusters full: {e}")))?;
4827
4828 let mut out: Vec<solo_core::Cluster> = Vec::new();
4831 for row in rows {
4832 let (cid_s, centroid_bytes, dtype_s, dim_i, coherence, memid_s) =
4833 row.map_err(|e| Error::storage(format!("clusters full row decode: {e}")))?;
4834 if dtype_s != "f32" || (dim_i as usize) != expected_dim {
4835 continue;
4836 }
4837 let cluster_id = match MemoryId::from_str(&cid_s) {
4838 Ok(id) => id,
4839 Err(e) => {
4840 tracing::warn!(
4841 cluster_id = %cid_s,
4842 error = %e,
4843 "skipping cluster with unparseable cluster_id"
4844 );
4845 continue;
4846 }
4847 };
4848 let memory_id = match MemoryId::from_str(&memid_s) {
4849 Ok(id) => id,
4850 Err(e) => {
4851 tracing::warn!(
4852 memory_id = %memid_s,
4853 error = %e,
4854 "skipping cluster_episodes row with unparseable memory_id"
4855 );
4856 continue;
4857 }
4858 };
4859 if out.last().map(|c| c.cluster_id) == Some(cluster_id) {
4863 out.last_mut().unwrap().episode_ids.push(memory_id);
4864 } else {
4865 let centroid = solo_core::Embedding {
4866 dtype: solo_core::EmbeddingDtype::F32,
4867 dim: dim_i as usize,
4868 data: centroid_bytes,
4869 };
4870 out.push(solo_core::Cluster {
4871 cluster_id,
4872 episode_ids: vec![memory_id],
4873 centroid: Some(centroid),
4874 coherence,
4875 });
4876 }
4877 }
4878 out.retain(|c| !c.episode_ids.is_empty());
4881 Ok(out)
4882 }
4883
4884 fn fetch_episodes_for_cluster(
4899 &self,
4900 cluster_id: &MemoryId,
4901 ) -> Result<Vec<Episode>> {
4902 let mut stmt = self
4903 .conn
4904 .prepare(
4905 "SELECT e.memory_id, e.ts_ms, e.source_type, e.source_id,
4906 e.content, e.encoding_context_json, e.provenance_json,
4907 e.confidence, e.strength, e.salience, e.tier
4908 FROM episodes e
4909 JOIN cluster_episodes ce ON ce.memory_id = e.memory_id
4910 WHERE ce.cluster_id = ?1
4911 AND e.status = 'active'
4912 ORDER BY e.ts_ms, e.rowid",
4913 )
4914 .map_err(|e| {
4915 Error::storage(format!("prepare fetch_episodes_for_cluster: {e}"))
4916 })?;
4917 let rows = stmt
4918 .query_map(params![cluster_id.to_string()], |r| {
4919 Ok((
4920 r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, String>(2)?, r.get::<_, Option<String>>(3)?, r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, Option<String>>(6)?, r.get::<_, f32>(7)?, r.get::<_, f32>(8)?, r.get::<_, f32>(9)?, r.get::<_, String>(10)?, ))
4932 })
4933 .map_err(|e| Error::storage(format!("query_map cluster episodes: {e}")))?;
4934
4935 let mut out: Vec<Episode> = Vec::new();
4936 for row in rows {
4937 let (
4938 mid_s,
4939 ts_ms,
4940 source_type,
4941 source_id,
4942 content,
4943 ctx_json,
4944 prov_json,
4945 conf,
4946 strength,
4947 salience,
4948 tier_s,
4949 ) = row.map_err(|e| Error::storage(format!("episode row decode: {e}")))?;
4950 let mid = MemoryId::from_str(&mid_s)
4951 .map_err(|e| Error::storage(format!("parse memory_id `{mid_s}`: {e}")))?;
4952 let confidence = solo_core::Confidence::new(conf).map_err(|e| {
4953 Error::storage(format!("invalid confidence in episode row: {e}"))
4954 })?;
4955 let encoding_context: solo_core::EncodingContext =
4956 serde_json::from_str(&ctx_json).unwrap_or_default();
4957 let provenance: Option<solo_core::Provenance> = prov_json
4958 .as_deref()
4959 .and_then(|s| serde_json::from_str(s).ok());
4960 let tier = match tier_s.as_str() {
4961 "hot" => Tier::Hot,
4962 "warm" => Tier::Warm,
4963 "cold" => Tier::Cold,
4964 other => {
4965 return Err(Error::storage(format!(
4966 "unknown tier value `{other}` in episodes row"
4967 )));
4968 }
4969 };
4970 out.push(Episode {
4971 memory_id: mid,
4972 ts_ms,
4973 source_type,
4974 source_id,
4975 content,
4976 encoding_context,
4977 provenance,
4978 confidence,
4979 strength,
4980 salience,
4981 tier,
4982 });
4983 }
4984 Ok(out)
4985 }
4986
4987 fn handle_reembed(
4988 &mut self,
4989 scope: ReembedScope,
4990 audit_principal: Option<String>,
4991 ) -> Result<ReembedReport> {
4992 let result = self.handle_reembed_impl(scope);
4993 match &result {
4997 Ok(report) => self.emit_audit_best_effort(
4998 AuditOperation::MemoryReembed,
4999 None,
5000 AuditResult::Ok,
5001 audit_principal,
5002 Some(serde_json::json!({
5003 "rows_seen": report.rows_seen,
5004 "rows_reembedded": report.rows_reembedded,
5005 "rows_failed": report.rows_failed,
5006 "rows_gc_deleted": report.rows_gc_deleted,
5007 "dry_run": report.dry_run,
5008 })),
5009 ),
5010 Err(e) => self.emit_audit_best_effort(
5011 AuditOperation::MemoryReembed,
5012 None,
5013 AuditResult::Error,
5014 audit_principal,
5015 Some(serde_json::json!({ "error": e.to_string() })),
5016 ),
5017 }
5018 result
5019 }
5020
5021 fn handle_reembed_impl(&mut self, scope: ReembedScope) -> Result<ReembedReport> {
5022 let current_id = self.embedder_id.ok_or_else(|| {
5028 Error::Other(
5029 "reembed: writer has no current embedder_id (use spawn_full_with_embedder)"
5030 .into(),
5031 )
5032 })?;
5033 let embedder = self.embedder.clone().ok_or_else(|| {
5034 Error::Other(
5035 "reembed: writer has no embedder (use spawn_full_with_embedder)".into(),
5036 )
5037 })?;
5038 let runtime = self.runtime_handle.clone().ok_or_else(|| {
5039 Error::Other(
5040 "reembed: writer has no runtime handle (use spawn_full_with_embedder)"
5041 .into(),
5042 )
5043 })?;
5044
5045 let from_id: Option<i64> = match &scope.from {
5049 None => None,
5050 Some((name, version)) => {
5051 let id: Option<i64> = self
5052 .conn
5053 .query_row(
5054 "SELECT embedder_id FROM embedders WHERE name = ? AND version = ?",
5055 params![name, version],
5056 |r| r.get::<_, i64>(0),
5057 )
5058 .optional()
5059 .map_err(|e| Error::storage(format!("lookup from embedder: {e}")))?;
5060 match id {
5061 Some(id) if id == current_id => {
5062 return Err(Error::Other(format!(
5063 "reembed: from-embedder ({name}, {version}) IS the current \
5064 embedder; nothing to do"
5065 )));
5066 }
5067 Some(id) => Some(id),
5068 None => {
5069 return Err(Error::not_found(format!(
5070 "reembed: from-embedder ({name}, {version}) not registered \
5071 in `embedders` table"
5072 )));
5073 }
5074 }
5075 }
5076 };
5077
5078 let candidates: Vec<(String, String)> = {
5082 let (sql, bound_id): (&str, i64) = match from_id {
5083 None => (
5084 "SELECT DISTINCT e.memory_id, e.content
5085 FROM episodes e
5086 JOIN embeddings em ON em.memory_id = e.memory_id
5087 WHERE em.embedder_id != ?1
5088 AND e.status = 'active'
5089 ORDER BY e.rowid",
5090 current_id,
5091 ),
5092 Some(fid) => (
5093 "SELECT DISTINCT e.memory_id, e.content
5094 FROM episodes e
5095 JOIN embeddings em ON em.memory_id = e.memory_id
5096 WHERE em.embedder_id = ?1
5097 AND e.status = 'active'
5098 ORDER BY e.rowid",
5099 fid,
5100 ),
5101 };
5102 let mut stmt = self
5103 .conn
5104 .prepare(sql)
5105 .map_err(|e| Error::storage(format!("prepare reembed select: {e}")))?;
5106 let rows = stmt
5107 .query_map(params![bound_id], |r| {
5108 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
5109 })
5110 .map_err(|e| Error::storage(format!("query_map reembed: {e}")))?;
5111 let mut out = Vec::new();
5112 for row in rows {
5113 out.push(
5114 row.map_err(|e| Error::storage(format!("reembed row decode: {e}")))?,
5115 );
5116 }
5117 out
5118 };
5119
5120 let mut report = ReembedReport {
5121 rows_seen: candidates.len(),
5122 rows_reembedded: 0,
5123 rows_failed: 0,
5124 rows_gc_deleted: 0,
5125 dry_run: scope.dry_run,
5126 };
5127
5128 if scope.dry_run {
5129 tracing::info!(
5130 seen = report.rows_seen,
5131 "reembed --dry-run: would re-embed N memories"
5132 );
5133 return Ok(report);
5134 }
5135
5136 let dtype_str = match embedder.dtype() {
5138 solo_core::EmbeddingDtype::F32 => "f32",
5139 solo_core::EmbeddingDtype::F16 => "f16",
5140 solo_core::EmbeddingDtype::I8 => "i8",
5141 solo_core::EmbeddingDtype::Binary => "binary",
5142 };
5143 let now_ms = chrono::Utc::now().timestamp_millis();
5144
5145 for (memory_id, content) in candidates {
5154 let embedding_res = runtime.block_on(embedder.embed(&content));
5155 let new_embedding = match embedding_res {
5156 Ok(emb) => emb,
5157 Err(e) => {
5158 tracing::warn!(%memory_id, error = %e, "reembed: embedder failed");
5159 report.rows_failed += 1;
5160 continue;
5161 }
5162 };
5163 if let Err(e) = new_embedding.validate() {
5164 tracing::warn!(%memory_id, error = %e, "reembed: embedding validate failed");
5165 report.rows_failed += 1;
5166 continue;
5167 }
5168
5169 let txn = match self
5170 .conn
5171 .transaction_with_behavior(TransactionBehavior::Immediate)
5172 {
5173 Ok(t) => t,
5174 Err(e) => {
5175 tracing::warn!(%memory_id, error = %e, "reembed: BEGIN failed");
5176 report.rows_failed += 1;
5177 continue;
5178 }
5179 };
5180
5181 let insert_res = txn.execute(
5187 "INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
5188 VALUES (?, ?, ?, ?, ?, ?)
5189 ON CONFLICT(memory_id, embedder_id) DO UPDATE SET
5190 dtype = excluded.dtype,
5191 dim = excluded.dim,
5192 vector = excluded.vector,
5193 created_at_ms = excluded.created_at_ms",
5194 params![
5195 memory_id,
5196 current_id,
5197 dtype_str,
5198 new_embedding.dim as i64,
5199 &new_embedding.data[..],
5200 now_ms,
5201 ],
5202 );
5203 if let Err(e) = insert_res {
5204 tracing::warn!(%memory_id, error = %e, "reembed: INSERT failed");
5205 report.rows_failed += 1;
5206 continue;
5207 }
5208
5209 let gc_count = if scope.gc {
5210 match txn.execute(
5211 "DELETE FROM embeddings
5212 WHERE memory_id = ? AND embedder_id != ?",
5213 params![memory_id, current_id],
5214 ) {
5215 Ok(n) => n,
5216 Err(e) => {
5217 tracing::warn!(%memory_id, error = %e, "reembed: GC DELETE failed");
5218 report.rows_failed += 1;
5219 continue;
5220 }
5221 }
5222 } else {
5223 0
5224 };
5225
5226 if let Err(e) = txn.commit() {
5227 tracing::warn!(%memory_id, error = %e, "reembed: COMMIT failed");
5228 report.rows_failed += 1;
5229 continue;
5230 }
5231 report.rows_reembedded += 1;
5232 report.rows_gc_deleted += gc_count;
5233 }
5234
5235 tracing::info!(
5236 seen = report.rows_seen,
5237 reembedded = report.rows_reembedded,
5238 failed = report.rows_failed,
5239 gc_deleted = report.rows_gc_deleted,
5240 "reembed complete"
5241 );
5242 Ok(report)
5243 }
5244
5245 fn handle_normalize_subjects(
5262 &mut self,
5263 aliases: Vec<(String, String)>,
5264 dry_run: bool,
5265 audit_principal: Option<String>,
5266 ) -> Result<NormalizeReport> {
5267 let mut report = NormalizeReport {
5268 aliases_processed: aliases.len(),
5269 subject_rows_updated: 0,
5270 object_rows_updated: 0,
5271 dry_run,
5272 };
5273
5274 if aliases.is_empty() {
5277 tracing::info!(dry_run, "normalize_subjects: empty alias list, no-op");
5278 self.emit_audit_best_effort(
5280 AuditOperation::MemoryNormalizeSubjects,
5281 None,
5282 AuditResult::Ok,
5283 audit_principal,
5284 Some(serde_json::json!({
5285 "aliases_processed": 0,
5286 "dry_run": dry_run,
5287 })),
5288 );
5289 return Ok(report);
5290 }
5291
5292 let tx = self
5293 .conn
5294 .transaction_with_behavior(TransactionBehavior::Immediate)
5295 .map_err(|e| {
5296 Error::storage(format!("BEGIN IMMEDIATE for normalize_subjects: {e}"))
5297 })?;
5298
5299 for (from, to) in &aliases {
5300 let subj_rows = tx
5301 .execute(
5302 "UPDATE triples SET subject_id = ?1, updated_at_ms = ?3 \
5303 WHERE subject_id = ?2",
5304 params![to, from, chrono::Utc::now().timestamp_millis()],
5305 )
5306 .map_err(|e| {
5307 Error::storage(format!(
5308 "normalize_subjects: UPDATE subject_id ({from} -> {to}): {e}"
5309 ))
5310 })?;
5311 let obj_rows = tx
5312 .execute(
5313 "UPDATE triples SET object_id = ?1, updated_at_ms = ?3 \
5314 WHERE object_id = ?2",
5315 params![to, from, chrono::Utc::now().timestamp_millis()],
5316 )
5317 .map_err(|e| {
5318 Error::storage(format!(
5319 "normalize_subjects: UPDATE object_id ({from} -> {to}): {e}"
5320 ))
5321 })?;
5322 report.subject_rows_updated += subj_rows;
5323 report.object_rows_updated += obj_rows;
5324 }
5325
5326 if dry_run {
5327 tx.rollback().map_err(|e| {
5331 Error::storage(format!(
5332 "normalize_subjects: ROLLBACK after dry-run: {e}"
5333 ))
5334 })?;
5335 tracing::info!(
5336 aliases_processed = report.aliases_processed,
5337 subject_rows = report.subject_rows_updated,
5338 object_rows = report.object_rows_updated,
5339 "normalize_subjects --dry-run: rolled back (would have updated N rows)"
5340 );
5341 self.emit_audit_best_effort(
5342 AuditOperation::MemoryNormalizeSubjects,
5343 None,
5344 AuditResult::Ok,
5345 audit_principal,
5346 Some(serde_json::json!({
5347 "aliases_processed": report.aliases_processed,
5348 "subject_rows_updated": report.subject_rows_updated,
5349 "object_rows_updated": report.object_rows_updated,
5350 "dry_run": true,
5351 })),
5352 );
5353 } else {
5354 insert_audit_row_in_tx(
5357 &tx,
5358 &AuditEvent {
5359 ts_ms: chrono::Utc::now().timestamp_millis(),
5360 principal_subject: audit_principal,
5361 operation: AuditOperation::MemoryNormalizeSubjects,
5362 target_id: None,
5363 result: AuditResult::Ok,
5364 details: Some(serde_json::json!({
5365 "aliases_processed": report.aliases_processed,
5366 "subject_rows_updated": report.subject_rows_updated,
5367 "object_rows_updated": report.object_rows_updated,
5368 "dry_run": false,
5369 })),
5370 },
5371 )?;
5372 tx.commit().map_err(|e| {
5373 Error::storage(format!("normalize_subjects: COMMIT: {e}"))
5374 })?;
5375 tracing::info!(
5376 aliases_processed = report.aliases_processed,
5377 subject_rows = report.subject_rows_updated,
5378 object_rows = report.object_rows_updated,
5379 "normalize_subjects complete"
5380 );
5381 }
5382
5383 Ok(report)
5384 }
5385
5386 fn handle_backup(&mut self, dest_path: &std::path::Path) -> Result<()> {
5387 let key = self.key.as_ref().ok_or_else(|| {
5388 Error::storage(
5389 "backup called but writer has no key material configured. \
5390 Spawn the writer with `spawn_full_with_key_and_optional_steward` \
5391 to enable WriteCommand::Backup.",
5392 )
5393 })?;
5394 backup_from_connection(&self.conn, dest_path, key)
5399 }
5400
5401 fn handle_save_snapshot(&mut self) -> Result<()> {
5402 let dir = self.snapshot_dir.as_ref().ok_or_else(|| {
5403 Error::storage("save_snapshot called but writer has no snapshot_dir configured")
5404 })?;
5405 let save_result = self.hnsw.save(dir);
5409
5410 self.run_idle_maintenance();
5418
5419 save_result
5420 }
5421
5422 fn run_idle_maintenance(&mut self) {
5425 if let Err(e) = self.conn.execute_batch("PRAGMA optimize") {
5426 tracing::debug!(error = %e, "PRAGMA optimize failed (non-fatal)");
5427 }
5428 if let Err(e) = self.conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)") {
5429 tracing::debug!(error = %e, "PRAGMA wal_checkpoint(PASSIVE) failed (non-fatal)");
5430 }
5431 }
5432
5433 fn emit_invalidate(&self, reason: &str, kind: &str) {
5460 let (Some(tx), Some(tenant_id)) =
5461 (&self.invalidate_tx, &self.invalidate_tenant_id)
5462 else {
5463 return;
5464 };
5465 let event = InvalidateEvent {
5466 reason: reason.to_string(),
5467 tenant_id: tenant_id.clone(),
5468 ts_ms: chrono::Utc::now().timestamp_millis(),
5469 kind: kind.to_string(),
5470 };
5471 let _ = tx.send(event);
5475 }
5476
5477 fn emit_audit_best_effort(
5478 &mut self,
5479 operation: AuditOperation,
5480 target_id: Option<String>,
5481 result: AuditResult,
5482 principal: Option<String>,
5483 details: Option<serde_json::Value>,
5484 ) {
5485 let event = AuditEvent {
5486 ts_ms: chrono::Utc::now().timestamp_millis(),
5487 principal_subject: principal,
5488 operation,
5489 target_id,
5490 result,
5491 details,
5492 };
5493 let tx_res = self
5494 .conn
5495 .transaction_with_behavior(TransactionBehavior::Immediate);
5496 let tx = match tx_res {
5497 Ok(t) => t,
5498 Err(e) => {
5499 tracing::warn!(
5500 error = %e,
5501 operation = %operation,
5502 "audit emit: BEGIN IMMEDIATE failed; dropping audit row"
5503 );
5504 return;
5505 }
5506 };
5507 if let Err(e) = insert_audit_row_in_tx(&tx, &event) {
5508 tracing::warn!(
5509 error = %e,
5510 operation = %operation,
5511 "audit emit: INSERT failed; dropping audit row"
5512 );
5513 return;
5514 }
5515 if let Err(e) = tx.commit() {
5516 tracing::warn!(
5517 error = %e,
5518 operation = %operation,
5519 "audit emit: COMMIT failed; dropping audit row"
5520 );
5521 }
5522 }
5523
5524 fn shutdown(&mut self) {
5525 if let Err(e) = self
5526 .conn
5527 .pragma_update(None, "wal_checkpoint", "TRUNCATE")
5528 {
5529 tracing::warn!(error = %e, "wal_checkpoint(TRUNCATE) on shutdown failed");
5530 }
5531 tracing::info!("writer actor shutdown complete");
5532 }
5533}
5534
5535fn redaction_audit_event(
5546 ts_ms: i64,
5547 principal_subject: Option<String>,
5548 target_id: Option<String>,
5549 matches: &[crate::redaction::RedactionMatch],
5550) -> AuditEvent {
5551 let details_matches: Vec<serde_json::Value> = matches
5552 .iter()
5553 .map(|m| {
5554 serde_json::json!({
5555 "pattern_name": m.pattern_name,
5556 "count": m.count,
5557 })
5558 })
5559 .collect();
5560 AuditEvent {
5561 ts_ms,
5562 principal_subject,
5563 operation: AuditOperation::RedactionApplied,
5564 target_id,
5565 result: AuditResult::Ok,
5566 details: Some(serde_json::json!({ "matches": details_matches })),
5567 }
5568}
5569
5570fn derive_document_title(text: &str, path: &std::path::Path) -> String {
5574 for (i, line) in text.lines().enumerate() {
5575 if i >= 64 {
5576 break;
5577 }
5578 let trimmed = line.trim_start();
5579 if let Some(rest) = trimmed.strip_prefix('#') {
5580 let body = rest.trim_start_matches('#').trim();
5582 if !body.is_empty() {
5583 let clean = body.trim_end_matches('#').trim();
5585 if !clean.is_empty() {
5586 return clean.to_string();
5587 }
5588 }
5589 }
5590 }
5591 path.file_stem()
5592 .and_then(|s| s.to_str())
5593 .map(|s| s.to_string())
5594 .unwrap_or_else(|| "(untitled)".to_string())
5595}
5596
5597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5602pub(crate) struct QuotaExceededError {
5603 pub current_size: u64,
5604 pub estimated_growth: u64,
5605 pub quota: u64,
5606}
5607
5608impl QuotaExceededError {
5609 pub fn to_details_json(self) -> serde_json::Value {
5613 serde_json::json!({
5614 "reason": "quota_exceeded",
5615 "current_size": self.current_size,
5616 "estimated_growth": self.estimated_growth,
5617 "quota": self.quota,
5618 })
5619 }
5620}
5621
5622impl std::fmt::Display for QuotaExceededError {
5623 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5624 write!(
5625 f,
5626 "tenant quota_bytes={} would be exceeded (current_size={}, estimated_growth={}). \
5627 Increase the quota via `solo tenants set-quota <id> --bytes <N>` or \
5628 `--unlimited`.",
5629 self.quota, self.current_size, self.estimated_growth
5630 )
5631 }
5632}
5633
5634#[derive(Debug, Clone, PartialEq, Eq)]
5641pub(crate) enum QuotaDecision {
5642 Unlimited,
5645 Allowed { current_size: u64, quota: u64 },
5648 Exceeded {
5653 current_size: u64,
5654 estimated_growth: u64,
5655 quota: u64,
5656 },
5657}
5658
5659pub(crate) fn check_quota(
5674 quota_bytes: Option<u64>,
5675 db_path: Option<&std::path::Path>,
5676 estimated_growth: u64,
5677) -> QuotaDecision {
5678 let Some(quota) = quota_bytes else {
5679 return QuotaDecision::Unlimited;
5680 };
5681 let Some(path) = db_path else {
5682 return QuotaDecision::Unlimited;
5685 };
5686 let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
5687 if current_size.saturating_add(estimated_growth) > quota {
5688 QuotaDecision::Exceeded {
5689 current_size,
5690 estimated_growth,
5691 quota,
5692 }
5693 } else {
5694 QuotaDecision::Allowed {
5695 current_size,
5696 quota,
5697 }
5698 }
5699}
5700
5701fn resolve_source_episode_id(
5715 conn: &rusqlite::Connection,
5716 provenance: &solo_core::Provenance,
5717) -> Option<i64> {
5718 let first = provenance.derived_from.first()?;
5719 let memory_id_str = first.to_string();
5720 conn.query_row(
5721 "SELECT rowid FROM episodes WHERE memory_id = ?",
5722 params![memory_id_str],
5723 |r| r.get::<_, i64>(0),
5724 )
5725 .optional()
5726 .ok()
5727 .flatten()
5728}
5729
5730fn resolve_source_episode_id_in_tx(
5734 tx: &rusqlite::Transaction<'_>,
5735 provenance: &solo_core::Provenance,
5736) -> Option<i64> {
5737 let first = provenance.derived_from.first()?;
5738 let memory_id_str = first.to_string();
5739 tx.query_row(
5740 "SELECT rowid FROM episodes WHERE memory_id = ?",
5741 params![memory_id_str],
5742 |r| r.get::<_, i64>(0),
5743 )
5744 .optional()
5745 .ok()
5746 .flatten()
5747}
5748
5749#[cfg(test)]
5750mod tests {
5751 use super::*;
5752 #[allow(unused_imports)]
5753 use crate::test_support::{
5754 StubVectorIndex, disabled_test_redactor, enabled_test_redactor, fixture_episode,
5755 fixture_embedding, open_test_db,
5756 };
5757 use std::time::Duration;
5758
5759 fn rt() -> tokio::runtime::Runtime {
5760 tokio::runtime::Builder::new_current_thread()
5761 .enable_all()
5762 .build()
5763 .unwrap()
5764 }
5765
5766 #[test]
5767 fn remember_happy_path_round_trip() {
5768 let (conn, _tmp) = open_test_db();
5769 let hnsw = Arc::new(StubVectorIndex::new(4));
5770 let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, hnsw.clone());
5771
5772 let episode = fixture_episode("test content");
5773 let embedding = fixture_embedding(4);
5774 let mid = rt()
5775 .block_on(handle.remember(episode.clone(), embedding))
5776 .unwrap();
5777 assert_eq!(mid, episode.memory_id);
5778
5779 std::thread::sleep(Duration::from_millis(50));
5780 drop(handle);
5781 std::thread::sleep(Duration::from_millis(50));
5782
5783 assert_eq!(hnsw.add_count(), 1);
5784 let added = hnsw.last_added().unwrap();
5785 assert_eq!(added.0, 1, "rowid should be 1 (first insert)");
5786 assert_eq!(added.1.len(), 4);
5787 }
5788
5789 #[test]
5790 fn dispatch_remember_replies_before_drain() {
5791 let (conn, _tmp) = open_test_db();
5792 let hnsw = Arc::new(StubVectorIndex::new(4));
5793 let (_tx, rx) = mpsc::channel(1);
5794 let mut actor = WriterActor {
5795 conn,
5796 hnsw,
5797 rx,
5798 snapshot_dir: None,
5799 embedder_id: None,
5800 embedder: None,
5801 runtime_handle: None,
5802 steward: None,
5803 steward_slot: None,
5804 triples_batch_signal: None,
5805 key: None,
5806 redactor: disabled_test_redactor(),
5807 quota_bytes: None,
5808 db_path: None,
5809 invalidate_tx: None,
5810 invalidate_tenant_id: None,
5811 };
5812 let (reply_tx, reply_rx) = oneshot::channel();
5813 let episode = fixture_episode("ordering test");
5814 let embedding = fixture_embedding(4);
5815
5816 actor.dispatch_remember(episode.clone(), embedding, None, reply_tx);
5817
5818 let received = reply_rx.blocking_recv().unwrap();
5819 assert_eq!(received.unwrap(), episode.memory_id);
5820
5821 let n: u32 = actor
5822 .conn
5823 .query_row("SELECT COUNT(*) FROM pending_index", [], |row| row.get(0))
5824 .unwrap();
5825 assert_eq!(n, 0);
5826 }
5827
5828 #[test]
5829 fn forget_unknown_memory_id_returns_not_found() {
5830 let (conn, _tmp) = open_test_db();
5831 let hnsw = Arc::new(StubVectorIndex::new(4));
5832 let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, hnsw);
5833
5834 let mid = MemoryId::new();
5835 let err = rt()
5836 .block_on(handle.forget(mid, "test".into()))
5837 .unwrap_err();
5838 assert!(err.to_string().contains("not found"), "got: {err}");
5839 }
5840
5841 #[test]
5842 fn forget_marks_status_forgotten() {
5843 let (conn, _tmp) = open_test_db();
5844 let hnsw = Arc::new(StubVectorIndex::new(4));
5845 let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, hnsw.clone());
5846
5847 let episode = fixture_episode("to be forgotten");
5848 let mid = rt()
5849 .block_on(handle.remember(episode.clone(), fixture_embedding(4)))
5850 .unwrap();
5851 rt().block_on(handle.forget(mid, "no longer relevant".into()))
5852 .unwrap();
5853
5854 assert_eq!(hnsw.add_count(), 1);
5865 let _ = mid; }
5868
5869 #[test]
5870 fn forget_is_idempotent_when_already_forgotten() {
5871 let (conn, _tmp) = open_test_db();
5872 let hnsw = Arc::new(StubVectorIndex::new(4));
5873 let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, hnsw);
5874
5875 let episode = fixture_episode("forget twice");
5876 let mid = rt()
5877 .block_on(handle.remember(episode, fixture_embedding(4)))
5878 .unwrap();
5879 rt().block_on(handle.forget(mid, "first".into())).unwrap();
5880 rt().block_on(handle.forget(mid, "second".into())).unwrap();
5882 }
5883
5884 #[test]
5885 fn many_concurrent_writes_serialize_correctly() {
5886 let (conn, _tmp) = open_test_db();
5887 let hnsw = Arc::new(StubVectorIndex::new(4));
5888 let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, hnsw.clone());
5889
5890 let runtime = tokio::runtime::Builder::new_multi_thread()
5891 .worker_threads(4)
5892 .enable_all()
5893 .build()
5894 .unwrap();
5895
5896 let results: Vec<Result<MemoryId>> = runtime.block_on(async {
5897 let mut tasks = Vec::new();
5898 for i in 0..50 {
5899 let h = handle.clone();
5900 let ep = fixture_episode(&format!("write {i}"));
5901 tasks.push(tokio::spawn(async move {
5902 h.remember(ep, fixture_embedding(4)).await
5903 }));
5904 }
5905 let mut out = Vec::new();
5906 for t in tasks {
5907 out.push(t.await.unwrap());
5908 }
5909 out
5910 });
5911
5912 let mut ids = std::collections::HashSet::new();
5913 for r in results {
5914 let mid = r.expect("remember must succeed");
5915 assert!(ids.insert(mid), "memory_ids must be unique");
5916 }
5917 assert_eq!(ids.len(), 50);
5918 assert_eq!(hnsw.add_count(), 50);
5919 }
5920
5921 fn seed_triple(
5940 conn: &Connection,
5941 triple_id: &str,
5942 subject: &str,
5943 predicate: &str,
5944 object: &str,
5945 object_kind: &str,
5946 ) {
5947 let now_ms = chrono::Utc::now().timestamp_millis();
5948 conn.execute(
5949 "INSERT INTO triples (
5950 triple_id, subject_id, predicate, object_id, object_kind,
5951 valid_from_ms, valid_to_ms, confidence, provenance_json,
5952 created_at_ms, updated_at_ms
5953 ) VALUES (?, ?, ?, ?, ?, ?, NULL, ?, ?, ?, ?)",
5954 params![
5955 triple_id,
5956 subject,
5957 predicate,
5958 object,
5959 object_kind,
5960 now_ms,
5961 0.9_f64,
5962 "{}",
5963 now_ms,
5964 now_ms,
5965 ],
5966 )
5967 .expect("seed triple");
5968 }
5969
5970 fn read_subject(conn: &Connection, triple_id: &str) -> String {
5973 conn.query_row(
5974 "SELECT subject_id FROM triples WHERE triple_id = ?",
5975 params![triple_id],
5976 |r| r.get::<_, String>(0),
5977 )
5978 .expect("read subject_id")
5979 }
5980
5981 fn read_object(conn: &Connection, triple_id: &str) -> String {
5983 conn.query_row(
5984 "SELECT object_id FROM triples WHERE triple_id = ?",
5985 params![triple_id],
5986 |r| r.get::<_, String>(0),
5987 )
5988 .expect("read object_id")
5989 }
5990
5991 fn build_actor_inline(conn: Connection) -> WriterActor {
5995 let (_tx, rx) = mpsc::channel(1);
5996 let hnsw = Arc::new(StubVectorIndex::new(4));
5997 WriterActor {
5998 conn,
5999 hnsw,
6000 rx,
6001 snapshot_dir: None,
6002 embedder_id: None,
6003 embedder: None,
6004 runtime_handle: None,
6005 steward: None,
6006 steward_slot: None,
6007 triples_batch_signal: None,
6008 key: None,
6009 redactor: disabled_test_redactor(),
6010 quota_bytes: None,
6011 db_path: None,
6012 invalidate_tx: None,
6013 invalidate_tenant_id: None,
6014 }
6015 }
6016
6017 #[test]
6018 fn normalize_subjects_updates_subject_column() {
6019 let (conn, _tmp) = open_test_db();
6020 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6021 let mut actor = build_actor_inline(conn);
6022
6023 let report = actor
6024 .handle_normalize_subjects(
6025 vec![("alex".into(), "user".into())],
6026 false,
6027 None,
6028 )
6029 .expect("normalize ok");
6030
6031 assert_eq!(report.aliases_processed, 1);
6032 assert_eq!(report.subject_rows_updated, 1);
6033 assert_eq!(report.object_rows_updated, 0);
6034 assert!(!report.dry_run);
6035 assert_eq!(read_subject(&actor.conn, "t1"), "user");
6036 assert_eq!(read_object(&actor.conn, "t1"), "rust");
6037 }
6038
6039 #[test]
6040 fn normalize_subjects_updates_object_column() {
6041 let (conn, _tmp) = open_test_db();
6042 seed_triple(&conn, "t1", "bob", "knows", "alex", "entity");
6044 let mut actor = build_actor_inline(conn);
6045
6046 let report = actor
6047 .handle_normalize_subjects(
6048 vec![("alex".into(), "user".into())],
6049 false,
6050 None,
6051 )
6052 .expect("normalize ok");
6053
6054 assert_eq!(report.subject_rows_updated, 0);
6055 assert_eq!(report.object_rows_updated, 1);
6056 assert_eq!(read_subject(&actor.conn, "t1"), "bob");
6057 assert_eq!(read_object(&actor.conn, "t1"), "user");
6058 }
6059
6060 #[test]
6061 fn normalize_subjects_updates_both_when_subject_equals_object() {
6062 let (conn, _tmp) = open_test_db();
6066 seed_triple(&conn, "t1", "alex", "is", "alex", "entity");
6067 let mut actor = build_actor_inline(conn);
6068
6069 let report = actor
6070 .handle_normalize_subjects(
6071 vec![("alex".into(), "user".into())],
6072 false,
6073 None,
6074 )
6075 .expect("normalize ok");
6076
6077 assert_eq!(report.subject_rows_updated, 1);
6078 assert_eq!(report.object_rows_updated, 1);
6079 assert_eq!(read_subject(&actor.conn, "t1"), "user");
6080 assert_eq!(read_object(&actor.conn, "t1"), "user");
6081 }
6082
6083 #[test]
6084 fn normalize_subjects_dry_run_rolls_back() {
6085 let (conn, _tmp) = open_test_db();
6086 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6087 seed_triple(&conn, "t2", "bob", "knows", "alex", "entity");
6088 let mut actor = build_actor_inline(conn);
6089
6090 let report = actor
6091 .handle_normalize_subjects(
6092 vec![("alex".into(), "user".into())],
6093 true,
6094 None,
6095 )
6096 .expect("dry-run normalize ok");
6097
6098 assert!(report.dry_run);
6100 assert_eq!(report.subject_rows_updated, 1);
6101 assert_eq!(report.object_rows_updated, 1);
6102 assert_eq!(read_subject(&actor.conn, "t1"), "alex");
6104 assert_eq!(read_object(&actor.conn, "t1"), "rust");
6105 assert_eq!(read_subject(&actor.conn, "t2"), "bob");
6106 assert_eq!(read_object(&actor.conn, "t2"), "alex");
6107 }
6108
6109 #[test]
6110 fn normalize_subjects_multiple_aliases() {
6111 let (conn, _tmp) = open_test_db();
6112 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6113 seed_triple(&conn, "t2", "bob", "uses", "python", "literal");
6114 seed_triple(&conn, "t3", "charlie", "knows", "alex", "entity");
6115 let mut actor = build_actor_inline(conn);
6116
6117 let report = actor
6118 .handle_normalize_subjects(
6119 vec![
6120 ("alex".into(), "user".into()),
6121 ("bob".into(), "user".into()),
6122 ],
6123 false,
6124 None,
6125 )
6126 .expect("normalize ok");
6127
6128 assert_eq!(report.aliases_processed, 2);
6129 assert_eq!(report.subject_rows_updated, 2);
6131 assert_eq!(report.object_rows_updated, 1);
6133
6134 assert_eq!(read_subject(&actor.conn, "t1"), "user");
6135 assert_eq!(read_subject(&actor.conn, "t2"), "user");
6136 assert_eq!(read_object(&actor.conn, "t3"), "user");
6137 assert_eq!(read_subject(&actor.conn, "t3"), "charlie");
6139 }
6140
6141 #[test]
6142 fn normalize_subjects_no_match_returns_zero_counts() {
6143 let (conn, _tmp) = open_test_db();
6144 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6145 let mut actor = build_actor_inline(conn);
6146
6147 let report = actor
6148 .handle_normalize_subjects(
6149 vec![("nobody".into(), "user".into())],
6150 false,
6151 None,
6152 )
6153 .expect("normalize ok");
6154
6155 assert_eq!(report.aliases_processed, 1);
6156 assert_eq!(report.subject_rows_updated, 0);
6157 assert_eq!(report.object_rows_updated, 0);
6158 assert_eq!(read_subject(&actor.conn, "t1"), "alex");
6160 }
6161
6162 #[test]
6163 fn normalize_subjects_empty_alias_list_is_noop() {
6164 let (conn, _tmp) = open_test_db();
6165 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6166 let mut actor = build_actor_inline(conn);
6167
6168 let report = actor
6169 .handle_normalize_subjects(vec![], false, None)
6170 .expect("normalize ok");
6171
6172 assert_eq!(report.aliases_processed, 0);
6173 assert_eq!(report.subject_rows_updated, 0);
6174 assert_eq!(report.object_rows_updated, 0);
6175 assert_eq!(read_subject(&actor.conn, "t1"), "alex");
6176 }
6177
6178 #[test]
6179 fn normalize_subjects_via_handle_round_trip() {
6180 let (conn, tmp) = open_test_db();
6183 seed_triple(&conn, "t1", "alex", "uses", "rust", "literal");
6184 seed_triple(&conn, "t2", "bob", "knows", "alex", "entity");
6185 drop(conn);
6189 let conn = crate::test_support::open_test_db_at(&tmp.path().join("test.db"));
6190
6191 let hnsw = Arc::new(StubVectorIndex::new(4));
6192 let WriterSpawn { handle, join } = WriterActor::spawn(conn, hnsw);
6193
6194 let report = rt()
6195 .block_on(handle.normalize_subjects(
6196 vec![("alex".into(), "user".into())],
6197 false,
6198 ))
6199 .expect("normalize via handle");
6200 assert_eq!(report.subject_rows_updated, 1);
6201 assert_eq!(report.object_rows_updated, 1);
6202
6203 drop(handle);
6204 join.join().expect("writer thread joins");
6205
6206 let conn = crate::test_support::open_test_db_at(&tmp.path().join("test.db"));
6209 let subj: String = conn
6210 .query_row(
6211 "SELECT subject_id FROM triples WHERE triple_id = 't1'",
6212 [],
6213 |r| r.get(0),
6214 )
6215 .unwrap();
6216 let obj: String = conn
6217 .query_row(
6218 "SELECT object_id FROM triples WHERE triple_id = 't2'",
6219 [],
6220 |r| r.get(0),
6221 )
6222 .unwrap();
6223 assert_eq!(subj, "user");
6224 assert_eq!(obj, "user");
6225 }
6226
6227 use crate::document::ChunkConfig;
6240 use crate::embedder::StubEmbedder;
6241 use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
6242 use solo_core::{ChunkId, DocumentId, Embedder};
6243
6244 fn build_ingest_actor(
6248 conn: Connection,
6249 ) -> (WriterActor, tokio::runtime::Runtime, Arc<StubVectorIndex>) {
6250 let runtime = tokio::runtime::Builder::new_multi_thread()
6253 .worker_threads(2)
6254 .enable_all()
6255 .build()
6256 .unwrap();
6257 let handle = runtime.handle().clone();
6258 let embedder: Arc<dyn Embedder> = Arc::new(StubEmbedder::new("stub", "v1", 4));
6259 let identity = EmbedderIdentity::from_embedder(embedder.as_ref());
6260 let embedder_id = get_or_insert_embedder_id(&conn, &identity).unwrap();
6261 let hnsw = Arc::new(StubVectorIndex::new(4));
6262 let (_tx, rx) = mpsc::channel(1);
6263 let actor = WriterActor {
6264 conn,
6265 hnsw: hnsw.clone(),
6266 rx,
6267 snapshot_dir: None,
6268 embedder_id: Some(embedder_id),
6269 embedder: Some(embedder),
6270 runtime_handle: Some(handle),
6271 steward: None,
6272 steward_slot: None,
6273 triples_batch_signal: None,
6274 key: None,
6275 redactor: disabled_test_redactor(),
6276 quota_bytes: None,
6277 db_path: None,
6278 invalidate_tx: None,
6279 invalidate_tenant_id: None,
6280 };
6281 (actor, runtime, hnsw)
6282 }
6283
6284 fn write_markdown(tmp: &tempfile::TempDir, name: &str, body: &str) -> std::path::PathBuf {
6286 let path = tmp.path().join(name);
6287 std::fs::write(&path, body).expect("write fixture");
6288 path
6289 }
6290
6291 #[derive(Debug)]
6293 struct FailingEmbedder {
6294 dim: usize,
6295 }
6296
6297 #[async_trait::async_trait]
6298 impl Embedder for FailingEmbedder {
6299 fn name(&self) -> &str {
6300 "fail"
6301 }
6302 fn version(&self) -> &str {
6303 "v1"
6304 }
6305 fn dim(&self) -> usize {
6306 self.dim
6307 }
6308 fn dtype(&self) -> solo_core::EmbeddingDtype {
6309 solo_core::EmbeddingDtype::F32
6310 }
6311 async fn embed_batch(
6312 &self,
6313 _texts: &[&str],
6314 ) -> Result<Vec<Embedding>> {
6315 Err(solo_core::Error::embedder("forced failure for test"))
6316 }
6317 }
6318
6319 #[test]
6322 fn ingest_document_persists_doc_and_chunks() {
6323 let (conn, _tmp) = open_test_db();
6324 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6325
6326 let docs_tmp = tempfile::TempDir::new().unwrap();
6327 let path = write_markdown(
6328 &docs_tmp,
6329 "intro.md",
6330 "# Intro\n\nFirst paragraph here.\n\nSecond paragraph here.\n",
6331 );
6332
6333 let (reply_tx, reply_rx) = oneshot::channel();
6334 actor.dispatch_ingest_document(path.clone(), ChunkConfig::default(), None, reply_tx);
6335 let report = reply_rx.blocking_recv().unwrap().expect("ingest ok");
6336
6337 assert!(!report.deduped);
6338 assert_eq!(report.chunks_persisted, 1, "tiny doc → one chunk");
6339 assert!(report.bytes_ingested > 0);
6340
6341 let (status, title, chunk_count): (String, String, i64) = actor
6343 .conn
6344 .query_row(
6345 "SELECT status, title, chunk_count FROM documents WHERE doc_id = ?",
6346 params![report.doc_id.to_string()],
6347 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
6348 )
6349 .unwrap();
6350 assert_eq!(status, "active");
6351 assert_eq!(title, "Intro", "first markdown heading becomes title");
6352 assert_eq!(chunk_count, 1);
6353
6354 let n_chunks: i64 = actor
6356 .conn
6357 .query_row(
6358 "SELECT COUNT(*) FROM document_chunks WHERE doc_id = ?",
6359 params![report.doc_id.to_string()],
6360 |r| r.get(0),
6361 )
6362 .unwrap();
6363 assert_eq!(n_chunks, 1);
6364
6365 let n_emb: i64 = actor
6367 .conn
6368 .query_row(
6369 "SELECT COUNT(*) FROM chunk_embeddings ce
6370 JOIN document_chunks dc ON dc.chunk_id = ce.chunk_id
6371 WHERE dc.doc_id = ?",
6372 params![report.doc_id.to_string()],
6373 |r| r.get(0),
6374 )
6375 .unwrap();
6376 assert_eq!(n_emb, 1);
6377
6378 assert_eq!(hnsw.add_count(), 1);
6380 }
6381
6382 #[test]
6383 fn ingest_document_pending_index_drains_cleanly() {
6384 let (conn, _tmp) = open_test_db();
6385 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6386
6387 let docs_tmp = tempfile::TempDir::new().unwrap();
6388 let path = write_markdown(&docs_tmp, "doc.md", "# Doc\n\nBody text.\n");
6389
6390 let (reply_tx, reply_rx) = oneshot::channel();
6391 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6392 let _ = reply_rx.blocking_recv().unwrap().expect("ingest ok");
6393
6394 let pending: i64 = actor
6395 .conn
6396 .query_row(
6397 "SELECT COUNT(*) FROM pending_index WHERE kind = 'chunk'",
6398 [],
6399 |r| r.get(0),
6400 )
6401 .unwrap();
6402 assert_eq!(pending, 0, "pending_index chunk rows fully drained");
6403 }
6404
6405 #[test]
6406 fn ingest_document_is_idempotent_by_content_hash() {
6407 let (conn, _tmp) = open_test_db();
6408 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6409
6410 let docs_tmp = tempfile::TempDir::new().unwrap();
6411 let path = write_markdown(&docs_tmp, "same.md", "# Same\n\nDeterministic body.\n");
6412
6413 let (reply_tx, reply_rx) = oneshot::channel();
6415 actor.dispatch_ingest_document(path.clone(), ChunkConfig::default(), None, reply_tx);
6416 let report1 = reply_rx.blocking_recv().unwrap().unwrap();
6417 assert!(!report1.deduped);
6418 assert_eq!(report1.chunks_persisted, 1);
6419 let hnsw_after_first = hnsw.add_count();
6420
6421 let (reply_tx, reply_rx) = oneshot::channel();
6423 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6424 let report2 = reply_rx.blocking_recv().unwrap().unwrap();
6425 assert!(report2.deduped);
6426 assert_eq!(report2.doc_id, report1.doc_id);
6427 assert_eq!(report2.chunks_persisted, 0);
6428 assert_eq!(
6429 hnsw.add_count(),
6430 hnsw_after_first,
6431 "dedup hit must not embed or add to HNSW"
6432 );
6433
6434 let n_docs: i64 = actor
6436 .conn
6437 .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))
6438 .unwrap();
6439 assert_eq!(n_docs, 1);
6440 }
6441
6442 #[test]
6443 fn ingest_document_rolls_back_on_embed_failure() {
6444 let (conn, _tmp) = open_test_db();
6446 let runtime = tokio::runtime::Builder::new_multi_thread()
6447 .worker_threads(2)
6448 .enable_all()
6449 .build()
6450 .unwrap();
6451 let handle = runtime.handle().clone();
6452 let embedder: Arc<dyn Embedder> = Arc::new(FailingEmbedder { dim: 4 });
6453 let identity = EmbedderIdentity::from_embedder(embedder.as_ref());
6454 let embedder_id = get_or_insert_embedder_id(&conn, &identity).unwrap();
6455 let hnsw = Arc::new(StubVectorIndex::new(4));
6456 let (_tx, rx) = mpsc::channel(1);
6457 let mut actor = WriterActor {
6458 conn,
6459 hnsw: hnsw.clone(),
6460 rx,
6461 snapshot_dir: None,
6462 embedder_id: Some(embedder_id),
6463 embedder: Some(embedder),
6464 runtime_handle: Some(handle),
6465 steward: None,
6466 steward_slot: None,
6467 triples_batch_signal: None,
6468 key: None,
6469 redactor: disabled_test_redactor(),
6470 quota_bytes: None,
6471 db_path: None,
6472 invalidate_tx: None,
6473 invalidate_tenant_id: None,
6474 };
6475
6476 let docs_tmp = tempfile::TempDir::new().unwrap();
6477 let path = write_markdown(&docs_tmp, "fail.md", "# Fail\n\nBody.\n");
6478
6479 let (reply_tx, reply_rx) = oneshot::channel();
6480 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6481 let err = reply_rx.blocking_recv().unwrap().unwrap_err();
6482 assert!(err.to_string().contains("embed_batch"), "got: {err}");
6483
6484 let n_docs: i64 = actor
6486 .conn
6487 .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))
6488 .unwrap();
6489 assert_eq!(n_docs, 0);
6490 let n_chunks: i64 = actor
6491 .conn
6492 .query_row("SELECT COUNT(*) FROM document_chunks", [], |r| r.get(0))
6493 .unwrap();
6494 assert_eq!(n_chunks, 0);
6495 let n_pending: i64 = actor
6496 .conn
6497 .query_row("SELECT COUNT(*) FROM pending_index", [], |r| r.get(0))
6498 .unwrap();
6499 assert_eq!(n_pending, 0);
6500 assert_eq!(hnsw.add_count(), 0);
6502 }
6503
6504 #[test]
6505 fn ingest_document_large_document() {
6506 let (conn, _tmp) = open_test_db();
6509 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6510
6511 let docs_tmp = tempfile::TempDir::new().unwrap();
6512 let mut body = String::from("# Header\n\n");
6513 for i in 0..30 {
6514 body.push_str(&format!(
6515 "Paragraph number {i} with several words in it.\n\n"
6516 ));
6517 }
6518 let path = write_markdown(&docs_tmp, "big.md", &body);
6519
6520 let (reply_tx, reply_rx) = oneshot::channel();
6521 actor.dispatch_ingest_document(
6522 path,
6523 ChunkConfig {
6524 target_tokens: 80,
6525 overlap_tokens: 10,
6526 },
6527 None,
6528 reply_tx,
6529 );
6530 let report = reply_rx.blocking_recv().unwrap().unwrap();
6531
6532 assert!(
6533 report.chunks_persisted >= 2,
6534 "expected multi-chunk, got {}",
6535 report.chunks_persisted
6536 );
6537 let n_chunks: i64 = actor
6538 .conn
6539 .query_row(
6540 "SELECT COUNT(*) FROM document_chunks WHERE doc_id = ?",
6541 params![report.doc_id.to_string()],
6542 |r| r.get(0),
6543 )
6544 .unwrap();
6545 assert_eq!(n_chunks as u32, report.chunks_persisted);
6546 assert_eq!(hnsw.add_count() as u32, report.chunks_persisted);
6547 }
6548
6549 #[test]
6550 fn ingest_document_uses_first_heading_as_title() {
6551 let (conn, _tmp) = open_test_db();
6552 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6553
6554 let docs_tmp = tempfile::TempDir::new().unwrap();
6555 let path = write_markdown(
6556 &docs_tmp,
6557 "any_name.md",
6558 "Preamble line without heading.\n\n## Sub Section Title\n\nBody.\n",
6559 );
6560
6561 let (reply_tx, reply_rx) = oneshot::channel();
6562 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6563 let report = reply_rx.blocking_recv().unwrap().unwrap();
6564
6565 let title: String = actor
6566 .conn
6567 .query_row(
6568 "SELECT title FROM documents WHERE doc_id = ?",
6569 params![report.doc_id.to_string()],
6570 |r| r.get(0),
6571 )
6572 .unwrap();
6573 assert_eq!(
6574 title, "Sub Section Title",
6575 "title comes from first heading line"
6576 );
6577 }
6578
6579 #[test]
6580 fn ingest_document_records_file_mtime() {
6581 let (conn, _tmp) = open_test_db();
6582 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6583
6584 let docs_tmp = tempfile::TempDir::new().unwrap();
6585 let path = write_markdown(&docs_tmp, "mtime.md", "# T\n\nBody.\n");
6586 let fs_mtime_ms = std::fs::metadata(&path)
6587 .unwrap()
6588 .modified()
6589 .unwrap()
6590 .duration_since(std::time::UNIX_EPOCH)
6591 .unwrap()
6592 .as_millis() as i64;
6593
6594 let (reply_tx, reply_rx) = oneshot::channel();
6595 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6596 let report = reply_rx.blocking_recv().unwrap().unwrap();
6597
6598 let modified_at_ms: Option<i64> = actor
6599 .conn
6600 .query_row(
6601 "SELECT modified_at_ms FROM documents WHERE doc_id = ?",
6602 params![report.doc_id.to_string()],
6603 |r| r.get(0),
6604 )
6605 .unwrap();
6606 let m = modified_at_ms.expect("modified_at_ms must be set when file mtime is readable");
6607 assert!(
6609 (m - fs_mtime_ms).abs() < 2_000,
6610 "modified_at_ms drift: db={m} fs={fs_mtime_ms}"
6611 );
6612 }
6613
6614 #[test]
6615 fn ingest_document_unsupported_extension_errors_cleanly() {
6616 let (conn, _tmp) = open_test_db();
6617 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6618
6619 let docs_tmp = tempfile::TempDir::new().unwrap();
6620 let path = docs_tmp.path().join("blob.bin");
6621 std::fs::write(&path, b"\x00\x01\x02").unwrap();
6622
6623 let (reply_tx, reply_rx) = oneshot::channel();
6624 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6625 let err = reply_rx.blocking_recv().unwrap().unwrap_err();
6626 assert!(
6627 err.to_string().contains("parse") || err.to_string().contains("extension"),
6628 "unsupported extension should surface as a parse error: {err}"
6629 );
6630 let n_docs: i64 = actor
6632 .conn
6633 .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))
6634 .unwrap();
6635 assert_eq!(n_docs, 0);
6636 assert_eq!(hnsw.add_count(), 0);
6637 }
6638
6639 #[test]
6640 fn ingest_document_writes_embedding_dim_correctly() {
6641 let (conn, _tmp) = open_test_db();
6642 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6643
6644 let docs_tmp = tempfile::TempDir::new().unwrap();
6645 let path = write_markdown(&docs_tmp, "dim.md", "# Dim\n\nText.\n");
6646
6647 let (reply_tx, reply_rx) = oneshot::channel();
6648 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
6649 let report = reply_rx.blocking_recv().unwrap().unwrap();
6650
6651 let dim: i64 = actor
6652 .conn
6653 .query_row(
6654 "SELECT ce.dim FROM chunk_embeddings ce
6655 JOIN document_chunks dc ON dc.chunk_id = ce.chunk_id
6656 WHERE dc.doc_id = ?",
6657 params![report.doc_id.to_string()],
6658 |r| r.get(0),
6659 )
6660 .unwrap();
6661 assert_eq!(dim, 4, "stub embedder dim is 4");
6662
6663 let dtype: String = actor
6664 .conn
6665 .query_row(
6666 "SELECT ce.dtype FROM chunk_embeddings ce
6667 JOIN document_chunks dc ON dc.chunk_id = ce.chunk_id
6668 WHERE dc.doc_id = ?",
6669 params![report.doc_id.to_string()],
6670 |r| r.get(0),
6671 )
6672 .unwrap();
6673 assert_eq!(dtype, "f32");
6674 }
6675
6676 #[test]
6679 fn forget_document_sets_status_forgotten() {
6680 let (conn, _tmp) = open_test_db();
6681 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6682
6683 let docs_tmp = tempfile::TempDir::new().unwrap();
6684 let path = write_markdown(&docs_tmp, "f.md", "# F\n\nBody.\n");
6685 let (tx, rx) = oneshot::channel();
6686 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, tx);
6687 let report = rx.blocking_recv().unwrap().unwrap();
6688
6689 let forget_report = actor.handle_forget_document(report.doc_id, None).unwrap();
6690 assert_eq!(forget_report.doc_id, report.doc_id);
6691 assert_eq!(forget_report.chunks_tombstoned, report.chunks_persisted);
6692
6693 let status: String = actor
6694 .conn
6695 .query_row(
6696 "SELECT status FROM documents WHERE doc_id = ?",
6697 params![report.doc_id.to_string()],
6698 |r| r.get(0),
6699 )
6700 .unwrap();
6701 assert_eq!(status, "forgotten");
6702 }
6703
6704 #[test]
6705 fn forget_document_tombstones_hnsw_rowids() {
6706 let (conn, _tmp) = open_test_db();
6707 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6708
6709 let docs_tmp = tempfile::TempDir::new().unwrap();
6710 let path = write_markdown(&docs_tmp, "t.md", "# T\n\nBody.\n");
6711 let (tx, rx) = oneshot::channel();
6712 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, tx);
6713 let report = rx.blocking_recv().unwrap().unwrap();
6714
6715 let added_before = hnsw.add_count();
6716 let removed_before = hnsw.remove_count();
6717
6718 let _ = actor.handle_forget_document(report.doc_id, None).unwrap();
6719
6720 assert_eq!(
6723 hnsw.remove_count() - removed_before,
6724 report.chunks_persisted as usize
6725 );
6726 assert_eq!(hnsw.add_count(), added_before, "forget must not add");
6727 }
6728
6729 #[test]
6730 fn forget_document_unknown_doc_id_returns_not_found() {
6731 let (conn, _tmp) = open_test_db();
6732 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6733
6734 let err = actor
6735 .handle_forget_document(DocumentId::new(), None)
6736 .unwrap_err();
6737 assert!(err.to_string().contains("not found"), "got: {err}");
6738 }
6739
6740 #[test]
6741 fn forget_document_idempotent() {
6742 let (conn, _tmp) = open_test_db();
6743 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
6744
6745 let docs_tmp = tempfile::TempDir::new().unwrap();
6746 let path = write_markdown(&docs_tmp, "idem.md", "# Idem\n\nBody.\n");
6747 let (tx, rx) = oneshot::channel();
6748 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, tx);
6749 let report = rx.blocking_recv().unwrap().unwrap();
6750
6751 let r1 = actor.handle_forget_document(report.doc_id, None).unwrap();
6752 let r2 = actor.handle_forget_document(report.doc_id, None).unwrap();
6753 assert_eq!(r1.doc_id, r2.doc_id);
6754 assert_eq!(r1.chunks_tombstoned, r2.chunks_tombstoned);
6755
6756 let status: String = actor
6758 .conn
6759 .query_row(
6760 "SELECT status FROM documents WHERE doc_id = ?",
6761 params![report.doc_id.to_string()],
6762 |r| r.get(0),
6763 )
6764 .unwrap();
6765 assert_eq!(status, "forgotten");
6766 }
6767
6768 #[test]
6769 fn ingest_document_then_forget_then_reingest_same_content_hash_dedups_forgotten_doc() {
6770 let (conn, _tmp) = open_test_db();
6775 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
6776
6777 let docs_tmp = tempfile::TempDir::new().unwrap();
6778 let path = write_markdown(&docs_tmp, "fr.md", "# FR\n\nBody.\n");
6779
6780 let (tx, rx) = oneshot::channel();
6781 actor.dispatch_ingest_document(path.clone(), ChunkConfig::default(), None, tx);
6782 let report1 = rx.blocking_recv().unwrap().unwrap();
6783 let _ = actor.handle_forget_document(report1.doc_id, None).unwrap();
6784
6785 let adds_before = hnsw.add_count();
6786 let (tx, rx) = oneshot::channel();
6787 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, tx);
6788 let report2 = rx.blocking_recv().unwrap().unwrap();
6789
6790 assert!(report2.deduped, "forgotten doc still wins dedup");
6791 assert_eq!(report2.doc_id, report1.doc_id);
6792 assert_eq!(report2.chunks_persisted, 0);
6793 assert_eq!(
6794 hnsw.add_count(),
6795 adds_before,
6796 "dedup hit must not add (even though doc is forgotten)"
6797 );
6798
6799 let status: String = actor
6801 .conn
6802 .query_row(
6803 "SELECT status FROM documents WHERE doc_id = ?",
6804 params![report1.doc_id.to_string()],
6805 |r| r.get(0),
6806 )
6807 .unwrap();
6808 assert_eq!(status, "forgotten");
6809 }
6810
6811 fn seed_pending_chunks(
6820 conn: &Connection,
6821 doc_id: &str,
6822 chunk_dim: usize,
6823 n: usize,
6824 ) -> Vec<i64> {
6825 let now_ms = chrono::Utc::now().timestamp_millis();
6826 conn.execute(
6827 "INSERT INTO documents (
6828 doc_id, source, title, mime_type,
6829 ingested_at_ms, modified_at_ms, status,
6830 chunk_count, content_hash, byte_size
6831 ) VALUES (?, ?, ?, ?, ?, NULL, 'active', ?, ?, ?)",
6832 params![
6833 doc_id,
6834 "test://source",
6835 "test",
6836 "text/plain",
6837 now_ms,
6838 n as i64,
6839 format!("{doc_id}_hash"),
6840 100i64,
6841 ],
6842 )
6843 .unwrap();
6844
6845 let mut rowids = Vec::with_capacity(n);
6846 for i in 0..n {
6847 let chunk_id = ChunkId::new().to_string();
6848 conn.execute(
6849 "INSERT INTO document_chunks (
6850 chunk_id, doc_id, chunk_index, content,
6851 token_count, start_offset, end_offset, created_at_ms
6852 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
6853 params![
6854 chunk_id,
6855 doc_id,
6856 i as i64,
6857 format!("chunk {i} text"),
6858 3i64,
6859 (i * 10) as i64,
6860 ((i + 1) * 10) as i64,
6861 now_ms,
6862 ],
6863 )
6864 .unwrap();
6865 let rowid = conn.last_insert_rowid();
6866 rowids.push(rowid);
6867
6868 let zeros = vec![0u8; chunk_dim * 4];
6869 conn.execute(
6870 "INSERT INTO pending_index (
6871 kind, chunk_id, embedding, embedding_dim, enqueued_at
6872 ) VALUES ('chunk', ?, ?, ?, ?)",
6873 params![chunk_id, &zeros[..], chunk_dim as i64, now_ms + i as i64],
6874 )
6875 .unwrap();
6876 }
6877 rowids
6878 }
6879
6880 #[test]
6881 fn recovery_replay_handles_chunk_pending_rows() {
6882 let (mut conn, _tmp) = open_test_db();
6883 let rowids = seed_pending_chunks(&conn, "11111111-1111-1111-1111-111111111111", 4, 3);
6884
6885 let stub = StubVectorIndex::new(4);
6886 let report = crate::recovery::replay_pending_index(&mut conn, &stub).unwrap();
6887 assert_eq!(report.rows_seen, 3);
6888 assert_eq!(report.rows_replayed, 3);
6889 assert_eq!(report.rows_failed, 0);
6890 let added: std::collections::HashSet<i64> =
6895 stub.entries().iter().map(|(r, _)| *r).collect();
6896 let expected: std::collections::HashSet<i64> = rowids
6897 .iter()
6898 .copied()
6899 .map(crate::hnsw_id::chunk_hnsw_id)
6900 .collect();
6901 assert_eq!(added, expected);
6902 let n: i64 = conn
6904 .query_row("SELECT COUNT(*) FROM pending_index", [], |r| r.get(0))
6905 .unwrap();
6906 assert_eq!(n, 0);
6907 }
6908
6909 #[test]
6910 fn recovery_replay_handles_mixed_episode_and_chunk_rows() {
6911 let (mut conn, _tmp) = open_test_db();
6912
6913 let now_ms = chrono::Utc::now().timestamp_millis();
6915 let mut episode_rowids = Vec::new();
6916 for content in &["ep_a", "ep_b"] {
6917 let ep = fixture_episode(content);
6918 let mid = ep.memory_id.to_string();
6919 conn.execute(
6920 "INSERT INTO episodes (
6921 memory_id, ts_ms, source_type, source_id, content,
6922 encoding_context_json, provenance_json, confidence,
6923 strength, salience, tier, created_at_ms, updated_at_ms
6924 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
6925 params![
6926 mid,
6927 ep.ts_ms,
6928 ep.source_type,
6929 ep.source_id,
6930 ep.content,
6931 "{}",
6932 Option::<String>::None,
6933 ep.confidence.0,
6934 ep.strength,
6935 ep.salience,
6936 "hot",
6937 now_ms,
6938 now_ms,
6939 ],
6940 )
6941 .unwrap();
6942 episode_rowids.push(conn.last_insert_rowid());
6943
6944 conn.execute(
6945 "INSERT INTO pending_index (kind, memory_id, embedding, embedding_dim, enqueued_at)
6946 VALUES ('episode', ?, ?, ?, ?)",
6947 params![mid, &vec![0u8; 16][..], 4i64, now_ms],
6948 )
6949 .unwrap();
6950 }
6951
6952 let chunk_rowids =
6954 seed_pending_chunks(&conn, "22222222-2222-2222-2222-222222222222", 4, 2);
6955
6956 let stub = StubVectorIndex::new(4);
6957 let report = crate::recovery::replay_pending_index(&mut conn, &stub).unwrap();
6958 assert_eq!(report.rows_seen, 4);
6959 assert_eq!(report.rows_replayed, 4);
6960 assert_eq!(report.rows_failed, 0);
6961
6962 let added: std::collections::HashSet<i64> =
6968 stub.entries().iter().map(|(r, _)| *r).collect();
6969 let mut expected: std::collections::HashSet<i64> = episode_rowids
6970 .iter()
6971 .copied()
6972 .map(crate::hnsw_id::episode_hnsw_id)
6973 .collect();
6974 expected.extend(
6975 chunk_rowids
6976 .iter()
6977 .copied()
6978 .map(crate::hnsw_id::chunk_hnsw_id),
6979 );
6980 assert_eq!(added, expected);
6981
6982 for r in &episode_rowids {
6988 for c in &chunk_rowids {
6989 let ep_id = crate::hnsw_id::episode_hnsw_id(*r);
6990 let chunk_id = crate::hnsw_id::chunk_hnsw_id(*c);
6991 assert_ne!(
6992 ep_id, chunk_id,
6993 "encoded episode and chunk ids must never collide"
6994 );
6995 }
6996 }
6997
6998 let n: i64 = conn
6999 .query_row("SELECT COUNT(*) FROM pending_index", [], |r| r.get(0))
7000 .unwrap();
7001 assert_eq!(n, 0);
7002 }
7003
7004 #[test]
7022 fn episode_and_chunk_with_same_rowid_coexist_in_hnsw() {
7023 let (conn, _tmp) = open_test_db();
7024 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
7025
7026 let ep = fixture_episode("episode body");
7028 let now_ms = chrono::Utc::now().timestamp_millis();
7029 let memory_id = ep.memory_id.to_string();
7034 actor
7035 .conn
7036 .execute(
7037 "INSERT INTO episodes (
7038 memory_id, ts_ms, source_type, source_id, content,
7039 encoding_context_json, provenance_json, confidence,
7040 strength, salience, tier, created_at_ms, updated_at_ms
7041 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
7042 params![
7043 memory_id,
7044 ep.ts_ms,
7045 ep.source_type,
7046 ep.source_id,
7047 ep.content,
7048 "{}",
7049 Option::<String>::None,
7050 ep.confidence.0,
7051 ep.strength,
7052 ep.salience,
7053 "hot",
7054 now_ms,
7055 now_ms,
7056 ],
7057 )
7058 .unwrap();
7059 let episode_rowid = actor.conn.last_insert_rowid();
7060 assert_eq!(episode_rowid, 1, "first episode insert must yield rowid=1");
7061 let ep_vec = vec![1.0f32, 0.0, 0.0, 0.0];
7064 hnsw.add(crate::hnsw_id::episode_hnsw_id(episode_rowid), &ep_vec)
7065 .unwrap();
7066
7067 let docs_tmp = tempfile::TempDir::new().unwrap();
7072 let path = write_markdown(
7073 &docs_tmp,
7074 "doc.md",
7075 "# Doc\n\nSome chunk content.\n",
7076 );
7077 let (reply_tx, reply_rx) = oneshot::channel();
7078 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
7079 let report = reply_rx.blocking_recv().unwrap().expect("ingest ok");
7080 assert_eq!(report.chunks_persisted, 1, "fixture produces one chunk");
7081
7082 let chunk_rowid: i64 = actor
7084 .conn
7085 .query_row(
7086 "SELECT rowid FROM document_chunks WHERE doc_id = ?",
7087 params![report.doc_id.to_string()],
7088 |r| r.get(0),
7089 )
7090 .unwrap();
7091 assert_eq!(
7092 chunk_rowid, episode_rowid,
7093 "chunk rowid must collide numerically with episode rowid for this test (both AUTOINCREMENT sequences start at 1)"
7094 );
7095
7096 let entries = hnsw.entries();
7098 let encoded_episode_id = crate::hnsw_id::episode_hnsw_id(episode_rowid);
7099 let encoded_chunk_id = crate::hnsw_id::chunk_hnsw_id(chunk_rowid);
7100 assert_ne!(
7101 encoded_episode_id, encoded_chunk_id,
7102 "encoded episode and chunk ids must differ even when raw rowids collide"
7103 );
7104 let ids: std::collections::HashSet<i64> =
7105 entries.iter().map(|(r, _)| *r).collect();
7106 assert!(
7107 ids.contains(&encoded_episode_id),
7108 "HNSW must carry episode at encoded id {encoded_episode_id}; entries: {entries:?}"
7109 );
7110 assert!(
7111 ids.contains(&encoded_chunk_id),
7112 "HNSW must carry chunk at encoded id {encoded_chunk_id}; entries: {entries:?}"
7113 );
7114
7115 for (id, _) in &entries {
7117 let (kind, decoded) = crate::hnsw_id::decode_hnsw_id(*id);
7118 match kind {
7119 crate::hnsw_id::HnswIdKind::Episode => {
7120 assert_eq!(decoded, episode_rowid);
7121 }
7122 crate::hnsw_id::HnswIdKind::Chunk => {
7123 assert_eq!(decoded, chunk_rowid);
7124 }
7125 }
7126 }
7127 }
7128
7129 mod ingest_max_bytes {
7136 use super::*;
7137 use crate::writer::{
7138 DEFAULT_INGEST_MAX_BYTES, SOLO_INGEST_MAX_BYTES_ENV, resolve_ingest_max_bytes,
7139 };
7140 use std::sync::Mutex;
7141
7142 static ENV_LOCK: Mutex<()> = Mutex::new(());
7143
7144 struct EnvGuard;
7147 impl Drop for EnvGuard {
7148 fn drop(&mut self) {
7149 unsafe { std::env::remove_var(SOLO_INGEST_MAX_BYTES_ENV) };
7151 }
7152 }
7153 fn fresh_env() -> EnvGuard {
7154 unsafe { std::env::remove_var(SOLO_INGEST_MAX_BYTES_ENV) };
7156 EnvGuard
7157 }
7158
7159 #[test]
7160 fn resolve_unset_returns_default() {
7161 let _lock = ENV_LOCK.lock().unwrap();
7162 let _g = fresh_env();
7163 assert_eq!(resolve_ingest_max_bytes(), Some(DEFAULT_INGEST_MAX_BYTES));
7164 }
7165
7166 #[test]
7167 fn resolve_zero_disables_cap() {
7168 let _lock = ENV_LOCK.lock().unwrap();
7169 let _g = fresh_env();
7170 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "0") };
7172 assert_eq!(resolve_ingest_max_bytes(), None);
7173 }
7174
7175 #[test]
7176 fn resolve_positive_integer_uses_value() {
7177 let _lock = ENV_LOCK.lock().unwrap();
7178 let _g = fresh_env();
7179 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "1024") };
7180 assert_eq!(resolve_ingest_max_bytes(), Some(1024));
7181 }
7182
7183 #[test]
7184 fn resolve_garbage_falls_back_to_default() {
7185 let _lock = ENV_LOCK.lock().unwrap();
7186 let _g = fresh_env();
7187 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "not-a-number") };
7188 assert_eq!(resolve_ingest_max_bytes(), Some(DEFAULT_INGEST_MAX_BYTES));
7189
7190 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "-1") };
7191 assert_eq!(resolve_ingest_max_bytes(), Some(DEFAULT_INGEST_MAX_BYTES));
7192
7193 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, " ") };
7194 assert_eq!(resolve_ingest_max_bytes(), Some(DEFAULT_INGEST_MAX_BYTES));
7195 }
7196
7197 #[test]
7198 fn ingest_rejects_oversized_file_with_clear_error() {
7199 let _lock = ENV_LOCK.lock().unwrap();
7200 let _g = fresh_env();
7201 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "1") };
7203
7204 let (conn, _tmp) = open_test_db();
7205 let (mut actor, _rt, hnsw) = build_ingest_actor(conn);
7206
7207 let docs_tmp = tempfile::TempDir::new().unwrap();
7208 let path = write_markdown(
7209 &docs_tmp,
7210 "big.md",
7211 "# Big\n\nThis is well over a single byte of content text.\n",
7212 );
7213
7214 let (reply_tx, reply_rx) = oneshot::channel();
7215 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
7216 let err = reply_rx.blocking_recv().unwrap().unwrap_err();
7217 let msg = err.to_string();
7218 assert!(
7219 msg.contains("SOLO_INGEST_MAX_BYTES")
7220 && msg.contains("exceeds")
7221 && msg.contains("disable"),
7222 "rejection message must call out the env var, threshold, and disable hint; got: {msg}"
7223 );
7224 let n_docs: i64 = actor
7226 .conn
7227 .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))
7228 .unwrap();
7229 assert_eq!(n_docs, 0);
7230 assert_eq!(hnsw.add_count(), 0);
7231 }
7232
7233 #[test]
7234 fn ingest_allows_undersized_file_under_custom_cap() {
7235 let _lock = ENV_LOCK.lock().unwrap();
7236 let _g = fresh_env();
7237 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "4096") };
7239
7240 let (conn, _tmp) = open_test_db();
7241 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
7242
7243 let docs_tmp = tempfile::TempDir::new().unwrap();
7244 let path = write_markdown(&docs_tmp, "ok.md", "# OK\n\nShort body.\n");
7245
7246 let (reply_tx, reply_rx) = oneshot::channel();
7247 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
7248 let report = reply_rx.blocking_recv().unwrap().expect("ingest under cap must succeed");
7249 assert!(!report.deduped);
7250 assert_eq!(report.chunks_persisted, 1);
7251 }
7252
7253 #[test]
7254 fn ingest_with_cap_zero_allows_any_size() {
7255 let _lock = ENV_LOCK.lock().unwrap();
7256 let _g = fresh_env();
7257 unsafe { std::env::set_var(SOLO_INGEST_MAX_BYTES_ENV, "0") };
7259
7260 let (conn, _tmp) = open_test_db();
7261 let (mut actor, _rt, _hnsw) = build_ingest_actor(conn);
7262
7263 let docs_tmp = tempfile::TempDir::new().unwrap();
7264 let path = write_markdown(
7266 &docs_tmp,
7267 "any.md",
7268 "# Any\n\nWith SOLO_INGEST_MAX_BYTES=0 any size is allowed.\n",
7269 );
7270
7271 let (reply_tx, reply_rx) = oneshot::channel();
7272 actor.dispatch_ingest_document(path, ChunkConfig::default(), None, reply_tx);
7273 let report = reply_rx.blocking_recv().unwrap().expect("cap=0 disables cap");
7274 assert_eq!(report.chunks_persisted, 1);
7275 }
7276 }
7277
7278 mod audit_emit_tests {
7289 use super::*;
7290 use crate::audit::AuditOperation;
7291
7292 fn build_ingest_actor_for_audit() -> (
7297 WriterActor,
7298 tokio::runtime::Runtime,
7299 tempfile::TempDir,
7300 ) {
7301 let runtime = tokio::runtime::Builder::new_multi_thread()
7302 .worker_threads(2)
7303 .enable_all()
7304 .build()
7305 .unwrap();
7306 let tmp = tempfile::TempDir::new().unwrap();
7307 let path = tmp.path().join("test.db");
7308 let conn = crate::test_support::open_test_db_at(&path);
7309
7310 let embedder: Arc<dyn solo_core::Embedder> =
7311 Arc::new(crate::StubEmbedder::new("stub", "v1", 4));
7312 let identity = crate::EmbedderIdentity {
7313 name: "stub".into(),
7314 version: "v1".into(),
7315 dim: 4,
7316 dtype: "f32".into(),
7317 };
7318 let embedder_id =
7319 crate::get_or_insert_embedder_id(&conn, &identity).unwrap();
7320
7321 let hnsw: Arc<dyn solo_core::VectorIndex + Send + Sync> =
7322 Arc::new(crate::test_support::StubVectorIndex::new(4));
7323 let (_tx, rx) = mpsc::channel(1);
7324 let actor = WriterActor {
7325 conn,
7326 hnsw,
7327 rx,
7328 snapshot_dir: None,
7329 embedder_id: Some(embedder_id),
7330 embedder: Some(embedder),
7331 runtime_handle: Some(runtime.handle().clone()),
7332 steward: None,
7333 steward_slot: None,
7334 triples_batch_signal: None,
7335 key: None,
7336 redactor: disabled_test_redactor(),
7337 quota_bytes: None,
7338 db_path: None,
7339 invalidate_tx: None,
7340 invalidate_tenant_id: None,
7341 };
7342 (actor, runtime, tmp)
7343 }
7344
7345 fn count_audit_rows_for_op(conn: &Connection, op: AuditOperation) -> i64 {
7346 conn.query_row(
7347 "SELECT COUNT(*) FROM audit_events WHERE operation = ?",
7348 params![op.as_str()],
7349 |r| r.get(0),
7350 )
7351 .unwrap()
7352 }
7353
7354 #[test]
7355 fn dispatch_remember_emits_audit_row_with_ok_result() {
7356 let (conn, _tmp) = open_test_db();
7357 let hnsw = Arc::new(crate::test_support::StubVectorIndex::new(4));
7358 let (_tx, rx) = mpsc::channel(1);
7359 let mut actor = WriterActor {
7360 conn,
7361 hnsw,
7362 rx,
7363 snapshot_dir: None,
7364 embedder_id: None,
7365 embedder: None,
7366 runtime_handle: None,
7367 steward: None,
7368 steward_slot: None,
7369 triples_batch_signal: None,
7370 key: None,
7371 redactor: disabled_test_redactor(),
7372 quota_bytes: None,
7373 db_path: None,
7374 invalidate_tx: None,
7375 invalidate_tenant_id: None,
7376 };
7377
7378 let (reply_tx, reply_rx) = oneshot::channel();
7379 let episode = fixture_episode("audit-remember");
7380 actor.dispatch_remember(
7381 episode.clone(),
7382 fixture_embedding(4),
7383 Some("alice".into()),
7384 reply_tx,
7385 );
7386 assert!(reply_rx.blocking_recv().unwrap().is_ok());
7387
7388 let (op, principal, target, result): (String, Option<String>, Option<String>, String) =
7389 actor
7390 .conn
7391 .query_row(
7392 "SELECT operation, principal_subject, target_id, result \
7393 FROM audit_events ORDER BY audit_id DESC LIMIT 1",
7394 [],
7395 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
7396 )
7397 .unwrap();
7398 assert_eq!(op, "memory.remember");
7399 assert_eq!(principal.as_deref(), Some("alice"));
7400 assert_eq!(target.as_deref(), Some(episode.memory_id.to_string().as_str()));
7401 assert_eq!(result, "ok");
7402 }
7403
7404 #[test]
7405 fn dispatch_remember_with_none_principal_persists_null() {
7406 let (conn, _tmp) = open_test_db();
7407 let hnsw = Arc::new(crate::test_support::StubVectorIndex::new(4));
7408 let (_tx, rx) = mpsc::channel(1);
7409 let mut actor = WriterActor {
7410 conn,
7411 hnsw,
7412 rx,
7413 snapshot_dir: None,
7414 embedder_id: None,
7415 embedder: None,
7416 runtime_handle: None,
7417 steward: None,
7418 steward_slot: None,
7419 triples_batch_signal: None,
7420 key: None,
7421 redactor: disabled_test_redactor(),
7422 quota_bytes: None,
7423 db_path: None,
7424 invalidate_tx: None,
7425 invalidate_tenant_id: None,
7426 };
7427
7428 let (reply_tx, reply_rx) = oneshot::channel();
7429 let episode = fixture_episode("audit-remember-noprincipal");
7430 actor.dispatch_remember(
7431 episode.clone(),
7432 fixture_embedding(4),
7433 None,
7434 reply_tx,
7435 );
7436 assert!(reply_rx.blocking_recv().unwrap().is_ok());
7437
7438 let principal: Option<String> = actor
7439 .conn
7440 .query_row(
7441 "SELECT principal_subject FROM audit_events ORDER BY audit_id DESC LIMIT 1",
7442 [],
7443 |r| r.get(0),
7444 )
7445 .unwrap();
7446 assert!(principal.is_none());
7447 }
7448
7449 #[test]
7450 fn handle_forget_emits_audit_row() {
7451 let (conn, _tmp) = open_test_db();
7452 let hnsw = Arc::new(crate::test_support::StubVectorIndex::new(4));
7453 let (_tx, rx) = mpsc::channel(1);
7454 let mut actor = WriterActor {
7455 conn,
7456 hnsw,
7457 rx,
7458 snapshot_dir: None,
7459 embedder_id: None,
7460 embedder: None,
7461 runtime_handle: None,
7462 steward: None,
7463 steward_slot: None,
7464 triples_batch_signal: None,
7465 key: None,
7466 redactor: disabled_test_redactor(),
7467 quota_bytes: None,
7468 db_path: None,
7469 invalidate_tx: None,
7470 invalidate_tenant_id: None,
7471 };
7472
7473 let (reply_tx, reply_rx) = oneshot::channel();
7475 let episode = fixture_episode("to-forget");
7476 actor.dispatch_remember(
7477 episode.clone(),
7478 fixture_embedding(4),
7479 None,
7480 reply_tx,
7481 );
7482 reply_rx.blocking_recv().unwrap().unwrap();
7483
7484 assert_eq!(
7486 count_audit_rows_for_op(&actor.conn, AuditOperation::MemoryRemember),
7487 1
7488 );
7489
7490 actor
7491 .handle_forget(episode.memory_id, "test".into(), Some("bob".into()))
7492 .unwrap();
7493 assert_eq!(
7494 count_audit_rows_for_op(&actor.conn, AuditOperation::MemoryForget),
7495 1
7496 );
7497 let principal: Option<String> = actor
7499 .conn
7500 .query_row(
7501 "SELECT principal_subject FROM audit_events \
7502 WHERE operation = 'memory.forget' \
7503 ORDER BY audit_id DESC LIMIT 1",
7504 [],
7505 |r| r.get(0),
7506 )
7507 .unwrap();
7508 assert_eq!(principal.as_deref(), Some("bob"));
7509 }
7510
7511 #[test]
7512 fn handle_forget_unknown_id_emits_no_success_row_emits_error_row_via_dispatch() {
7513 let (conn, _tmp) = open_test_db();
7516 let hnsw = Arc::new(crate::test_support::StubVectorIndex::new(4));
7517 let (_tx, rx) = mpsc::channel(1);
7518 let mut actor = WriterActor {
7519 conn,
7520 hnsw,
7521 rx,
7522 snapshot_dir: None,
7523 embedder_id: None,
7524 embedder: None,
7525 runtime_handle: None,
7526 steward: None,
7527 steward_slot: None,
7528 triples_batch_signal: None,
7529 key: None,
7530 redactor: disabled_test_redactor(),
7531 quota_bytes: None,
7532 db_path: None,
7533 invalidate_tx: None,
7534 invalidate_tenant_id: None,
7535 };
7536
7537 let unknown = MemoryId::new();
7538 let cmd = WriteCommand::Forget {
7540 memory_id: unknown,
7541 reason: "test".into(),
7542 audit_principal: Some("carol".into()),
7543 reply: oneshot::channel().0,
7544 };
7545 actor.dispatch(cmd);
7546
7547 let (result, principal): (String, Option<String>) = actor
7548 .conn
7549 .query_row(
7550 "SELECT result, principal_subject FROM audit_events \
7551 WHERE operation = 'memory.forget' \
7552 ORDER BY audit_id DESC LIMIT 1",
7553 [],
7554 |r| Ok((r.get(0)?, r.get(1)?)),
7555 )
7556 .unwrap();
7557 assert_eq!(result, "error");
7558 assert_eq!(principal.as_deref(), Some("carol"));
7559 }
7560
7561 #[test]
7562 fn handle_forget_document_emits_audit_row() {
7563 let (mut actor, runtime, tmp) = build_ingest_actor_for_audit();
7564 let docs_dir = tmp.path().join("docs");
7566 std::fs::create_dir_all(&docs_dir).unwrap();
7567 let path = docs_dir.join("test.md");
7568 std::fs::write(&path, "# audit doc\nsome content").unwrap();
7569 let (reply_tx, reply_rx) = oneshot::channel();
7570 actor.dispatch_ingest_document(
7571 path,
7572 crate::document::ChunkConfig::default(),
7573 None,
7574 reply_tx,
7575 );
7576 let report = reply_rx.blocking_recv().unwrap().unwrap();
7577
7578 let _ = actor
7580 .handle_forget_document(report.doc_id, Some("dora".into()))
7581 .unwrap();
7582 assert_eq!(
7583 count_audit_rows_for_op(&actor.conn, AuditOperation::MemoryForgetDocument),
7584 1
7585 );
7586 let principal: Option<String> = actor
7587 .conn
7588 .query_row(
7589 "SELECT principal_subject FROM audit_events \
7590 WHERE operation = 'memory.forget_document' \
7591 ORDER BY audit_id DESC LIMIT 1",
7592 [],
7593 |r| r.get(0),
7594 )
7595 .unwrap();
7596 assert_eq!(principal.as_deref(), Some("dora"));
7597 drop(runtime);
7598 }
7599
7600 #[test]
7601 fn ingest_document_emits_one_audit_row() {
7602 let (mut actor, runtime, tmp) = build_ingest_actor_for_audit();
7603 let docs_dir = tmp.path().join("docs");
7604 std::fs::create_dir_all(&docs_dir).unwrap();
7605 let path = docs_dir.join("ingest.md");
7606 std::fs::write(&path, "# ingested\nbody").unwrap();
7607 let (reply_tx, reply_rx) = oneshot::channel();
7608 actor.dispatch_ingest_document(
7609 path,
7610 crate::document::ChunkConfig::default(),
7611 Some("eve".into()),
7612 reply_tx,
7613 );
7614 let _ = reply_rx.blocking_recv().unwrap().unwrap();
7615
7616 assert_eq!(
7617 count_audit_rows_for_op(&actor.conn, AuditOperation::MemoryIngestDocument),
7618 1
7619 );
7620 let (principal, result): (Option<String>, String) = actor
7621 .conn
7622 .query_row(
7623 "SELECT principal_subject, result FROM audit_events \
7624 WHERE operation = 'memory.ingest_document' \
7625 ORDER BY audit_id DESC LIMIT 1",
7626 [],
7627 |r| Ok((r.get(0)?, r.get(1)?)),
7628 )
7629 .unwrap();
7630 assert_eq!(principal.as_deref(), Some("eve"));
7631 assert_eq!(result, "ok");
7632 drop(runtime);
7633 }
7634
7635 #[test]
7636 fn normalize_subjects_emits_audit_row_inside_tx() {
7637 let (conn, _tmp) = open_test_db();
7638 let now_ms = chrono::Utc::now().timestamp_millis();
7640 conn.execute(
7641 "INSERT INTO triples (
7642 triple_id, subject_id, predicate, object_id, object_kind,
7643 valid_from_ms, valid_to_ms, confidence, provenance_json,
7644 created_at_ms, updated_at_ms
7645 ) VALUES (?, 'alex', 'uses', 'rust', 'literal', ?, NULL, 0.9, '{}', ?, ?)",
7646 params![
7647 "00000000-0000-0000-0000-000000000010",
7648 now_ms,
7649 now_ms,
7650 now_ms
7651 ],
7652 )
7653 .unwrap();
7654 let hnsw = Arc::new(crate::test_support::StubVectorIndex::new(4));
7655 let (_tx, rx) = mpsc::channel(1);
7656 let mut actor = WriterActor {
7657 conn,
7658 hnsw,
7659 rx,
7660 snapshot_dir: None,
7661 embedder_id: None,
7662 embedder: None,
7663 runtime_handle: None,
7664 steward: None,
7665 steward_slot: None,
7666 triples_batch_signal: None,
7667 key: None,
7668 redactor: disabled_test_redactor(),
7669 quota_bytes: None,
7670 db_path: None,
7671 invalidate_tx: None,
7672 invalidate_tenant_id: None,
7673 };
7674 let _ = actor
7675 .handle_normalize_subjects(
7676 vec![("alex".into(), "user".into())],
7677 false,
7678 Some("frank".into()),
7679 )
7680 .unwrap();
7681 assert_eq!(
7682 count_audit_rows_for_op(
7683 &actor.conn,
7684 AuditOperation::MemoryNormalizeSubjects,
7685 ),
7686 1
7687 );
7688 let principal: Option<String> = actor
7689 .conn
7690 .query_row(
7691 "SELECT principal_subject FROM audit_events \
7692 WHERE operation = 'memory.normalize_subjects' \
7693 ORDER BY audit_id DESC LIMIT 1",
7694 [],
7695 |r| r.get(0),
7696 )
7697 .unwrap();
7698 assert_eq!(principal.as_deref(), Some("frank"));
7699 }
7700 }
7701
7702 mod redaction_tests {
7710 use super::*;
7711 use crate::test_support::{enabled_test_redactor, open_test_db};
7712 use std::sync::Arc;
7713
7714 fn build_redacting_actor(conn: Connection) -> (WriterActor, Arc<StubVectorIndex>) {
7715 let hnsw = Arc::new(StubVectorIndex::new(4));
7716 let (_tx, rx) = mpsc::channel(1);
7717 let actor = WriterActor {
7718 conn,
7719 hnsw: hnsw.clone(),
7720 rx,
7721 snapshot_dir: None,
7722 embedder_id: None,
7723 embedder: None,
7724 runtime_handle: None,
7725 steward: None,
7726 steward_slot: None,
7727 triples_batch_signal: None,
7728 key: None,
7729 redactor: enabled_test_redactor(),
7730 quota_bytes: None,
7731 db_path: None,
7732 invalidate_tx: None,
7733 invalidate_tenant_id: None,
7734 };
7735 (actor, hnsw)
7736 }
7737
7738 #[test]
7739 fn redacted_content_lands_on_disk_for_remember() {
7740 let (conn, _tmp) = open_test_db();
7741 let (mut actor, _hnsw) = build_redacting_actor(conn);
7742
7743 let mut episode = fixture_episode("contact me at user@example.com please");
7744 let mid = episode.memory_id;
7745 let embedding = fixture_embedding(4);
7746 let (tx, rx) = oneshot::channel();
7747 actor.dispatch_remember(
7748 std::mem::replace(&mut episode, fixture_episode("placeholder")),
7749 embedding,
7750 Some("alice".into()),
7751 tx,
7752 );
7753 assert!(rx.blocking_recv().unwrap().is_ok());
7754
7755 let content: String = actor
7756 .conn
7757 .query_row(
7758 "SELECT content FROM episodes WHERE memory_id = ?",
7759 params![mid.to_string()],
7760 |r| r.get(0),
7761 )
7762 .unwrap();
7763 assert!(content.contains("[REDACTED:email]"), "got `{content}`");
7764 assert!(!content.contains("user@example.com"));
7765 }
7766
7767 #[test]
7768 fn redaction_audit_row_emitted_with_pattern_counts() {
7769 let (conn, _tmp) = open_test_db();
7770 let (mut actor, _hnsw) = build_redacting_actor(conn);
7771
7772 let episode = fixture_episode(
7773 "ssn 123-45-6789 phone 555-123-4567 mail a@b.com",
7774 );
7775 let mid = episode.memory_id;
7776 let (tx, rx) = oneshot::channel();
7777 actor.dispatch_remember(
7778 episode,
7779 fixture_embedding(4),
7780 Some("carol".into()),
7781 tx,
7782 );
7783 rx.blocking_recv().unwrap().unwrap();
7784
7785 let (op, target, details_json): (String, Option<String>, Option<String>) = actor
7786 .conn
7787 .query_row(
7788 "SELECT operation, target_id, details_json \
7789 FROM audit_events WHERE operation = 'redaction.applied'",
7790 [],
7791 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
7792 )
7793 .unwrap();
7794 assert_eq!(op, "redaction.applied");
7795 assert_eq!(target.as_deref(), Some(mid.to_string().as_str()));
7796 let details: serde_json::Value =
7797 serde_json::from_str(details_json.as_deref().unwrap()).unwrap();
7798 let names: Vec<String> = details["matches"]
7799 .as_array()
7800 .unwrap()
7801 .iter()
7802 .map(|m| m["pattern_name"].as_str().unwrap().to_string())
7803 .collect();
7804 assert!(names.contains(&"email".to_string()));
7805 assert!(names.contains(&"ssn".to_string()));
7806 assert!(names.contains(&"us_phone".to_string()));
7807 }
7808
7809 #[test]
7810 fn audit_row_does_not_contain_original_pii() {
7811 let (conn, _tmp) = open_test_db();
7814 let (mut actor, _hnsw) = build_redacting_actor(conn);
7815
7816 let episode = fixture_episode("email leak@example.com here");
7817 let (tx, rx) = oneshot::channel();
7818 actor.dispatch_remember(
7819 episode,
7820 fixture_embedding(4),
7821 Some("dan".into()),
7822 tx,
7823 );
7824 rx.blocking_recv().unwrap().unwrap();
7825
7826 let details: Option<String> = actor
7827 .conn
7828 .query_row(
7829 "SELECT details_json FROM audit_events \
7830 WHERE operation = 'redaction.applied'",
7831 [],
7832 |r| r.get(0),
7833 )
7834 .unwrap();
7835 let d = details.expect("redaction audit row must have details");
7836 assert!(!d.contains("leak@example.com"), "PII leaked into audit: `{d}`");
7837 assert!(!d.contains("leak"), "PII fragment in audit: `{d}`");
7838 }
7839
7840 #[test]
7841 fn principal_subject_persisted_on_episode_row() {
7842 let (conn, _tmp) = open_test_db();
7845 let (mut actor, _hnsw) = build_redacting_actor(conn);
7846
7847 let episode = fixture_episode("plain content");
7848 let mid = episode.memory_id;
7849 let (tx, rx) = oneshot::channel();
7850 actor.dispatch_remember(
7851 episode,
7852 fixture_embedding(4),
7853 Some("erin".into()),
7854 tx,
7855 );
7856 rx.blocking_recv().unwrap().unwrap();
7857
7858 let principal: Option<String> = actor
7859 .conn
7860 .query_row(
7861 "SELECT principal_subject FROM episodes WHERE memory_id = ?",
7862 params![mid.to_string()],
7863 |r| r.get(0),
7864 )
7865 .unwrap();
7866 assert_eq!(principal.as_deref(), Some("erin"));
7867 }
7868
7869 #[test]
7870 fn no_redaction_audit_row_when_no_matches() {
7871 let (conn, _tmp) = open_test_db();
7873 let (mut actor, _hnsw) = build_redacting_actor(conn);
7874 let episode = fixture_episode("no pii here at all");
7875 let (tx, rx) = oneshot::channel();
7876 actor.dispatch_remember(
7877 episode,
7878 fixture_embedding(4),
7879 Some("frank".into()),
7880 tx,
7881 );
7882 rx.blocking_recv().unwrap().unwrap();
7883
7884 let n: i64 = actor
7885 .conn
7886 .query_row(
7887 "SELECT COUNT(*) FROM audit_events WHERE operation = 'redaction.applied'",
7888 [],
7889 |r| r.get(0),
7890 )
7891 .unwrap();
7892 assert_eq!(n, 0);
7893 }
7894
7895 #[test]
7896 fn read_path_returns_redacted_content() {
7897 let (conn, _tmp) = open_test_db();
7902 let (mut actor, _hnsw) = build_redacting_actor(conn);
7903
7904 let episode = fixture_episode("token gh PAT ghp_abcdefghijABCDEFGHIJabcdefghijABCDEF12 done");
7905 let mid = episode.memory_id;
7906 let (tx, rx) = oneshot::channel();
7907 actor.dispatch_remember(
7908 episode,
7909 fixture_embedding(4),
7910 None,
7911 tx,
7912 );
7913 rx.blocking_recv().unwrap().unwrap();
7914
7915 let content: String = actor
7916 .conn
7917 .query_row(
7918 "SELECT content FROM episodes WHERE memory_id = ?",
7919 params![mid.to_string()],
7920 |r| r.get(0),
7921 )
7922 .unwrap();
7923 assert!(content.contains("[REDACTED:github_pat]"), "got `{content}`");
7924 assert!(!content.contains("ghp_abcdefghij"));
7925 }
7926 }
7927
7928 mod quota_tests {
7930 use super::*;
7931
7932 #[test]
7933 fn unlimited_branch_short_circuits_without_db_path() {
7934 let decision = check_quota(None, None, 1_000_000);
7937 assert_eq!(decision, QuotaDecision::Unlimited);
7938 }
7939
7940 #[test]
7941 fn allowed_when_current_plus_growth_fits_under_quota() {
7942 let tmp = tempfile::NamedTempFile::new().unwrap();
7943 std::fs::write(tmp.path(), vec![0u8; 100]).unwrap();
7945 let decision = check_quota(Some(1000), Some(tmp.path()), 200);
7946 assert!(
7947 matches!(decision, QuotaDecision::Allowed { current_size: 100, quota: 1000 }),
7948 "got {decision:?}"
7949 );
7950 }
7951
7952 #[test]
7953 fn allowed_when_current_plus_growth_exactly_hits_quota() {
7954 let tmp = tempfile::NamedTempFile::new().unwrap();
7956 std::fs::write(tmp.path(), vec![0u8; 500]).unwrap();
7957 let decision = check_quota(Some(1000), Some(tmp.path()), 500);
7958 assert!(
7959 matches!(decision, QuotaDecision::Allowed { .. }),
7960 "exactly-on-quota must be allowed: got {decision:?}"
7961 );
7962 }
7963
7964 #[test]
7965 fn exceeded_when_growth_would_overflow_quota() {
7966 let tmp = tempfile::NamedTempFile::new().unwrap();
7967 std::fs::write(tmp.path(), vec![0u8; 900]).unwrap();
7968 let decision = check_quota(Some(1000), Some(tmp.path()), 200);
7969 assert!(
7970 matches!(decision, QuotaDecision::Exceeded {
7971 current_size: 900,
7972 estimated_growth: 200,
7973 quota: 1000,
7974 }),
7975 "got {decision:?}"
7976 );
7977 }
7978
7979 #[test]
7980 fn exceeded_payload_renders_into_audit_json() {
7981 let err = QuotaExceededError {
7982 current_size: 900,
7983 estimated_growth: 200,
7984 quota: 1000,
7985 };
7986 let v = err.to_details_json();
7987 assert_eq!(v["reason"], "quota_exceeded");
7988 assert_eq!(v["current_size"], 900);
7989 assert_eq!(v["estimated_growth"], 200);
7990 assert_eq!(v["quota"], 1000);
7991 }
7992
7993 #[test]
7994 fn handle_remember_durable_rejects_when_quota_exceeded() {
7995 let (conn, tmp) = open_test_db();
8000 let db_path = tmp.path().join("test.db");
8001 let hnsw: Arc<dyn solo_core::VectorIndex + Send + Sync> =
8005 Arc::new(crate::test_support::StubVectorIndex::new(4));
8006 let (_tx, rx) = mpsc::channel(1);
8007 let mut actor = WriterActor {
8008 conn,
8009 hnsw,
8010 rx,
8011 snapshot_dir: None,
8012 embedder_id: None,
8013 embedder: None,
8014 runtime_handle: None,
8015 steward: None,
8016 steward_slot: None,
8017 triples_batch_signal: None,
8018 key: None,
8019 redactor: disabled_test_redactor(),
8020 quota_bytes: Some(10),
8023 db_path: Some(db_path),
8024 invalidate_tx: None,
8025 invalidate_tenant_id: None,
8026 };
8027
8028 let ep = fixture_episode("this exceeds the 10-byte quota easily");
8029 let result = actor.handle_remember_durable(
8030 ep,
8031 fixture_embedding(4),
8032 Some("erin".into()),
8033 );
8034 assert!(
8035 matches!(result, Err(solo_core::Error::Forbidden(_))),
8036 "must reject with Forbidden, got: {result:?}"
8037 );
8038
8039 let count: i64 = actor
8041 .conn
8042 .query_row(
8043 "SELECT COUNT(*) FROM audit_events \
8044 WHERE operation='memory.remember' AND result='forbidden'",
8045 [],
8046 |r| r.get(0),
8047 )
8048 .unwrap();
8049 assert_eq!(count, 1, "forbidden audit row must land");
8050
8051 let details: String = actor
8053 .conn
8054 .query_row(
8055 "SELECT details_json FROM audit_events \
8056 WHERE operation='memory.remember' AND result='forbidden' \
8057 ORDER BY audit_id DESC LIMIT 1",
8058 [],
8059 |r| r.get(0),
8060 )
8061 .unwrap();
8062 let v: serde_json::Value = serde_json::from_str(&details).unwrap();
8063 assert_eq!(v["reason"], "quota_exceeded");
8064 assert_eq!(v["quota"], 10);
8065 }
8066
8067 #[test]
8068 fn handle_remember_durable_proceeds_when_quota_unlimited() {
8069 let (conn, _tmp) = open_test_db();
8072 let hnsw: Arc<dyn solo_core::VectorIndex + Send + Sync> =
8073 Arc::new(crate::test_support::StubVectorIndex::new(4));
8074 let (_tx, rx) = mpsc::channel(1);
8075 let mut actor = WriterActor {
8076 conn,
8077 hnsw,
8078 rx,
8079 snapshot_dir: None,
8080 embedder_id: None,
8081 embedder: None,
8082 runtime_handle: None,
8083 steward: None,
8084 steward_slot: None,
8085 triples_batch_signal: None,
8086 key: None,
8087 redactor: disabled_test_redactor(),
8088 quota_bytes: None,
8089 db_path: None,
8090 invalidate_tx: None,
8091 invalidate_tenant_id: None,
8092 };
8093 let ep = fixture_episode("any content");
8094 let result = actor.handle_remember_durable(
8095 ep,
8096 fixture_embedding(4),
8097 None,
8098 );
8099 assert!(result.is_ok(), "unlimited quota must allow the write: {result:?}");
8100 }
8101 }
8102
8103 mod p4a_steward_slot_tests {
8108 use super::*;
8109 use solo_steward::{Steward, StewardConfig, test_support::StubLlmClient};
8110
8111 fn arc_stub_steward() -> Arc<Steward> {
8112 Arc::new(Steward::new(
8113 Arc::new(StubLlmClient::default_stub().pretend_real_llm(true)),
8114 StewardConfig::default(),
8115 ))
8116 }
8117
8118 fn build_actor(
8119 steward: Option<Arc<Steward>>,
8120 slot: Option<Arc<AsyncRwLock<Option<Arc<Steward>>>>>,
8121 ) -> WriterActor {
8122 let (conn, _tmp) = open_test_db();
8123 std::mem::forget(_tmp);
8127 let hnsw = Arc::new(StubVectorIndex::new(4));
8128 let (_tx, rx) = mpsc::channel(1);
8129 WriterActor {
8130 conn,
8131 hnsw,
8132 rx,
8133 snapshot_dir: None,
8134 embedder_id: None,
8135 embedder: None,
8136 runtime_handle: None,
8137 steward,
8138 steward_slot: slot,
8139 triples_batch_signal: None,
8140 key: None,
8141 redactor: disabled_test_redactor(),
8142 quota_bytes: None,
8143 db_path: None,
8144 invalidate_tx: None,
8145 invalidate_tenant_id: None,
8146 }
8147 }
8148
8149 #[test]
8155 fn writer_actor_consults_steward_slot_when_self_steward_is_none() {
8156 let slot_steward = arc_stub_steward();
8157 let slot = Arc::new(AsyncRwLock::new(Some(slot_steward.clone())));
8158 let actor = build_actor(None, Some(slot));
8159 let resolved = actor.current_steward().expect("slot populated");
8160 assert!(
8161 Arc::ptr_eq(&resolved, &slot_steward),
8162 "current_steward must return the slot's Steward when self.steward is None",
8163 );
8164 }
8165
8166 #[test]
8171 fn writer_actor_prefers_slot_over_self_steward_when_both_set() {
8172 let slot_steward = arc_stub_steward();
8173 let eager_steward = arc_stub_steward();
8174 assert!(!Arc::ptr_eq(&slot_steward, &eager_steward));
8177
8178 let slot = Arc::new(AsyncRwLock::new(Some(slot_steward.clone())));
8179 let actor = build_actor(Some(eager_steward.clone()), Some(slot));
8180 let resolved = actor.current_steward().expect("slot populated");
8181 assert!(
8182 Arc::ptr_eq(&resolved, &slot_steward),
8183 "current_steward must prefer the slot when both are set",
8184 );
8185 assert!(
8186 !Arc::ptr_eq(&resolved, &eager_steward),
8187 "current_steward must NOT return self.steward when slot is populated",
8188 );
8189 }
8190
8191 #[test]
8196 fn writer_actor_uses_self_steward_when_slot_is_none() {
8197 let eager_steward = arc_stub_steward();
8198 let slot = Arc::new(AsyncRwLock::new(None));
8201 let actor = build_actor(Some(eager_steward.clone()), Some(slot));
8202 let resolved = actor.current_steward().expect("self.steward populated");
8203 assert!(
8204 Arc::ptr_eq(&resolved, &eager_steward),
8205 "current_steward must fall back to self.steward when the slot is empty",
8206 );
8207 }
8208
8209 #[test]
8214 fn writer_actor_returns_none_when_slot_and_self_steward_are_both_none() {
8215 let actor = build_actor(None, Some(Arc::new(AsyncRwLock::new(None))));
8216 assert!(actor.current_steward().is_none());
8217 }
8218
8219 #[test]
8225 fn writer_actor_falls_back_when_slot_is_unwired() {
8226 let eager_steward = arc_stub_steward();
8227 let actor = build_actor(Some(eager_steward.clone()), None);
8228 let resolved = actor.current_steward().expect("self.steward populated");
8229 assert!(
8230 Arc::ptr_eq(&resolved, &eager_steward),
8231 "no slot wired → current_steward returns self.steward",
8232 );
8233 }
8234
8235 #[test]
8268 fn current_steward_falls_back_to_self_steward_on_read_contention() {
8269 let slot_steward = arc_stub_steward();
8270 let eager_steward = arc_stub_steward();
8271 assert!(
8272 !Arc::ptr_eq(&slot_steward, &eager_steward),
8273 "slot and self.steward must be distinct Arc allocations for ptr_eq to discriminate"
8274 );
8275
8276 let slot = Arc::new(AsyncRwLock::new(Some(slot_steward.clone())));
8277 let actor = build_actor(Some(eager_steward.clone()), Some(slot.clone()));
8278
8279 let baseline = actor.current_steward().expect("baseline: slot populated");
8281 assert!(
8282 Arc::ptr_eq(&baseline, &slot_steward),
8283 "baseline (no contention) must return the slot's Steward"
8284 );
8285
8286 let runtime = tokio::runtime::Builder::new_multi_thread()
8290 .worker_threads(2)
8291 .enable_all()
8292 .build()
8293 .unwrap();
8294
8295 let (lock_held_tx, lock_held_rx) =
8300 std::sync::mpsc::sync_channel::<()>(1);
8301 let (release_tx, release_rx) =
8302 tokio::sync::oneshot::channel::<()>();
8303
8304 let slot_for_holder = slot.clone();
8305 let holder = runtime.spawn(async move {
8306 let guard = slot_for_holder.write().await;
8307 let _ = lock_held_tx.send(());
8308 let _ = release_rx.await;
8310 drop(guard);
8312 });
8313
8314 lock_held_rx
8318 .recv()
8319 .expect("holder must signal lock acquisition");
8320
8321 let under_contention = actor
8325 .current_steward()
8326 .expect("self.steward populated, fallback must succeed");
8327 assert!(
8328 Arc::ptr_eq(&under_contention, &eager_steward),
8329 "under read-contention, current_steward must fall back to self.steward"
8330 );
8331 assert!(
8332 !Arc::ptr_eq(&under_contention, &slot_steward),
8333 "under read-contention, current_steward must NOT return the slot's Steward"
8334 );
8335
8336 let _ = release_tx.send(());
8338 runtime.block_on(holder).unwrap();
8339
8340 let after_release = actor.current_steward().expect("post-release: slot");
8342 assert!(
8343 Arc::ptr_eq(&after_release, &slot_steward),
8344 "after lock release, current_steward must return the slot's Steward again"
8345 );
8346 }
8347
8348 #[test]
8354 fn current_steward_returns_none_under_contention_when_self_steward_is_none() {
8355 let slot_steward = arc_stub_steward();
8356 let slot = Arc::new(AsyncRwLock::new(Some(slot_steward.clone())));
8357 let actor = build_actor(None, Some(slot.clone()));
8359
8360 let runtime = tokio::runtime::Builder::new_multi_thread()
8361 .worker_threads(2)
8362 .enable_all()
8363 .build()
8364 .unwrap();
8365
8366 let (lock_held_tx, lock_held_rx) =
8367 std::sync::mpsc::sync_channel::<()>(1);
8368 let (release_tx, release_rx) =
8369 tokio::sync::oneshot::channel::<()>();
8370 let slot_for_holder = slot.clone();
8371 let holder = runtime.spawn(async move {
8372 let guard = slot_for_holder.write().await;
8373 let _ = lock_held_tx.send(());
8374 let _ = release_rx.await;
8375 drop(guard);
8376 });
8377
8378 lock_held_rx
8379 .recv()
8380 .expect("holder must signal lock acquisition");
8381
8382 let under_contention = actor.current_steward();
8385 assert!(
8386 under_contention.is_none(),
8387 "contention + no self.steward fallback => current_steward returns None"
8388 );
8389
8390 let _ = release_tx.send(());
8391 runtime.block_on(holder).unwrap();
8392
8393 let after = actor
8395 .current_steward()
8396 .expect("post-release: slot populated");
8397 assert!(Arc::ptr_eq(&after, &slot_steward));
8398 }
8399 }
8400
8401 mod p4b_no_inline_llm_pins {
8406 use super::*;
8407 use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
8408 use crate::test_support::{StubVectorIndex, fixture_embedding, fixture_episode};
8409 use crate::writer::ConsolidationScope;
8410 use solo_steward::test_support::StubLlmClient;
8411 use solo_steward::{Steward, StewardConfig};
8412 use std::sync::Arc;
8413 use std::time::{Duration as StdDuration, Instant};
8414 use tempfile::TempDir;
8415
8416 fn rt_multi() -> tokio::runtime::Runtime {
8417 tokio::runtime::Builder::new_multi_thread()
8418 .worker_threads(2)
8419 .enable_all()
8420 .build()
8421 .unwrap()
8422 }
8423
8424 #[test]
8440 fn consolidate_command_returns_quickly_without_blocking_on_llm() {
8441 use crate::test_support::open_test_db_at;
8442 let tmp = TempDir::new().unwrap();
8443 let path = tmp.path().join("test.db");
8444 let dim = 4usize;
8445 let embedder_id = {
8446 let conn = open_test_db_at(&path);
8447 get_or_insert_embedder_id(
8448 &conn,
8449 &EmbedderIdentity {
8450 name: "stub".into(),
8451 version: "v1".into(),
8452 dim: dim as u32,
8453 dtype: "f32".into(),
8454 },
8455 )
8456 .unwrap()
8457 };
8458
8459 let llm = Arc::new(StubLlmClient::default_stub().pretend_real_llm(true));
8464 let steward = Some(Arc::new(Steward::new(llm, StewardConfig::default())));
8465
8466 let runtime = rt_multi();
8467 runtime.block_on(async {
8468 let conn = open_test_db_at(&path);
8469 let hnsw = Arc::new(StubVectorIndex::new(dim));
8470 let embedder: Arc<dyn solo_core::Embedder> =
8471 Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
8472 let WriterSpawn { handle, join } =
8473 WriterActor::spawn_full_with_embedder_and_optional_steward(
8474 conn,
8475 hnsw,
8476 tmp.path().to_path_buf(),
8477 embedder_id,
8478 embedder,
8479 steward,
8480 );
8481
8482 fn aligned_embedding(dim: usize) -> Embedding {
8487 let mut data = vec![0u8; dim * 4];
8488 let bytes = 1.0f32.to_le_bytes();
8489 data[..4].copy_from_slice(&bytes);
8490 Embedding {
8491 dtype: solo_core::EmbeddingDtype::F32,
8492 dim,
8493 data,
8494 }
8495 }
8496
8497 for i in 0..3 {
8498 let mut ep = fixture_episode(&format!("e{i}"));
8499 ep.ts_ms = 1_700_000_000_000 + (i as i64) * 1000;
8500 handle
8501 .remember(ep, aligned_embedding(dim))
8502 .await
8503 .unwrap();
8504 }
8505
8506 let started = Instant::now();
8507 let report = handle
8508 .consolidate(ConsolidationScope::default())
8509 .await
8510 .expect("consolidate ok");
8511 let elapsed = started.elapsed();
8512
8513 assert!(
8514 elapsed < StdDuration::from_millis(100),
8515 "consolidate took {elapsed:?}; pre-P4 it ran the LLM \
8516 loop inline. Post-P4 it MUST NOT — the writer-actor's \
8517 command path stays off the LLM critical path. (If the \
8518 pin fires the lesson is: the v0.8.x `block_on` regressed.)"
8519 );
8520 assert!(
8522 report.clusters_built >= 1,
8523 "clustering pass should at least try; got {:?}",
8524 report,
8525 );
8526 assert_eq!(
8528 report.abstractions_built, 0,
8529 "writer-actor must NOT build abstractions inline"
8530 );
8531 assert_eq!(
8532 report.triples_built, 0,
8533 "writer-actor must NOT extract triples inline"
8534 );
8535
8536 drop(handle);
8537 tokio::task::spawn_blocking(move || join.join().unwrap())
8538 .await
8539 .unwrap();
8540 });
8541 }
8542
8543 #[test]
8558 fn triples_extraction_does_not_happen_in_writer_actor_command_path() {
8559 use crate::test_support::open_test_db_at;
8560 let tmp = TempDir::new().unwrap();
8561 let path = tmp.path().join("test.db");
8562 let dim = 4usize;
8563 let embedder_id = {
8564 let conn = open_test_db_at(&path);
8565 get_or_insert_embedder_id(
8566 &conn,
8567 &EmbedderIdentity {
8568 name: "stub".into(),
8569 version: "v1".into(),
8570 dim: dim as u32,
8571 dtype: "f32".into(),
8572 },
8573 )
8574 .unwrap()
8575 };
8576
8577 let canned = r#"{
8581 "content": "Inline triples MUST NOT land via the writer-actor.",
8582 "confidence": 0.9,
8583 "triples": [
8584 { "subject_id": "ghost", "predicate": "should_not", "object_id": "exist", "object_kind": "literal" }
8585 ]
8586 }"#;
8587 let llm = Arc::new(StubLlmClient::with_canned("stub-llm", canned)
8588 .pretend_real_llm(true));
8589 let steward = Some(Arc::new(Steward::new(llm, StewardConfig::default())));
8590
8591 let runtime = rt_multi();
8592 runtime.block_on(async {
8593 let conn = open_test_db_at(&path);
8594 let hnsw = Arc::new(StubVectorIndex::new(dim));
8595 let embedder: Arc<dyn solo_core::Embedder> =
8596 Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
8597 let WriterSpawn { handle, join } =
8598 WriterActor::spawn_full_with_embedder_and_optional_steward(
8599 conn,
8600 hnsw,
8601 tmp.path().to_path_buf(),
8602 embedder_id,
8603 embedder,
8604 steward,
8605 );
8606
8607 let ep = fixture_episode("remember-only");
8609 handle
8610 .remember(ep, fixture_embedding(dim))
8611 .await
8612 .unwrap();
8613 {
8614 let read = open_test_db_at(&path);
8615 let n: i64 = read
8616 .query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
8617 .unwrap();
8618 assert_eq!(
8619 n, 0,
8620 "Remember command must NOT write any triple rows; \
8621 triples land later via AttachAbstractionBatch"
8622 );
8623 }
8624
8625 for i in 1..4 {
8628 let mut ep = fixture_episode(&format!("c{i}"));
8629 ep.ts_ms = 1_700_000_000_000 + (i as i64) * 1000;
8630 handle
8631 .remember(ep, fixture_embedding(dim))
8632 .await
8633 .unwrap();
8634 }
8635 let _report = handle
8636 .consolidate(ConsolidationScope::default())
8637 .await
8638 .unwrap();
8639 {
8640 let read = open_test_db_at(&path);
8641 let n: i64 = read
8642 .query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
8643 .unwrap();
8644 assert_eq!(
8645 n, 0,
8646 "writer-actor's Consolidate command must NOT \
8647 extract triples even with a canned-response Steward \
8648 wired (pre-P4 it would write the 'ghost' triple)"
8649 );
8650 let n_abs: i64 = read
8651 .query_row(
8652 "SELECT COUNT(*) FROM semantic_abstractions",
8653 [],
8654 |r| r.get(0),
8655 )
8656 .unwrap();
8657 assert_eq!(
8658 n_abs, 0,
8659 "matching pin: writer-actor's Consolidate must NOT \
8660 write semantic_abstractions inline either"
8661 );
8662 }
8663
8664 drop(handle);
8665 tokio::task::spawn_blocking(move || join.join().unwrap())
8666 .await
8667 .unwrap();
8668 });
8669 }
8670 }
8671
8672 mod p0_remember_batch_tests {
8687 use super::*;
8688 use crate::test_support::{
8689 StubVectorIndex, disabled_test_redactor, fixture_embedding, fixture_episode,
8690 open_test_db,
8691 };
8692
8693 fn build_actor() -> WriterActor {
8694 let (conn, _tmp) = open_test_db();
8695 std::mem::forget(_tmp);
8696 let hnsw: Arc<dyn solo_core::VectorIndex + Send + Sync> =
8697 Arc::new(StubVectorIndex::new(4));
8698 let (_tx, rx) = mpsc::channel(1);
8699 WriterActor {
8700 conn,
8701 hnsw,
8702 rx,
8703 snapshot_dir: None,
8704 embedder_id: None,
8705 embedder: None,
8706 runtime_handle: None,
8707 steward: None,
8708 steward_slot: None,
8709 triples_batch_signal: None,
8710 key: None,
8711 redactor: disabled_test_redactor(),
8712 quota_bytes: None,
8713 db_path: None,
8714 invalidate_tx: None,
8715 invalidate_tenant_id: None,
8716 }
8717 }
8718
8719 #[test]
8720 fn dispatch_remember_batch_empty_returns_invalid_input() {
8721 let mut actor = build_actor();
8722 let (reply_tx, reply_rx) = oneshot::channel();
8723 actor.dispatch_remember_batch(Vec::new(), None, reply_tx);
8724 let result = reply_rx.blocking_recv().unwrap();
8725 assert!(
8726 matches!(result, Err(solo_core::Error::InvalidInput(_))),
8727 "empty batch must reject with InvalidInput, got: {result:?}"
8728 );
8729 }
8730
8731 #[test]
8732 fn dispatch_remember_batch_over_cap_returns_invalid_input() {
8733 let mut actor = build_actor();
8734 let (reply_tx, reply_rx) = oneshot::channel();
8735 let items: Vec<(Episode, Embedding)> = (0..(MAX_REMEMBER_BATCH_SIZE + 1))
8736 .map(|i| (fixture_episode(&format!("over-cap-{i}")), fixture_embedding(4)))
8737 .collect();
8738 actor.dispatch_remember_batch(items, None, reply_tx);
8739 let result = reply_rx.blocking_recv().unwrap();
8740 match result {
8741 Err(solo_core::Error::InvalidInput(msg)) => {
8742 assert!(
8743 msg.contains("MAX_REMEMBER_BATCH_SIZE"),
8744 "error must reference the cap; got: {msg}"
8745 );
8746 }
8747 other => panic!("expected InvalidInput, got: {other:?}"),
8748 }
8749 }
8750
8751 #[test]
8752 fn dispatch_remember_batch_inserts_all_items_in_one_tx() {
8753 let mut actor = build_actor();
8754 let (reply_tx, reply_rx) = oneshot::channel();
8755 let items: Vec<(Episode, Embedding)> = (0..5)
8756 .map(|i| {
8757 let mut ep = fixture_episode(&format!("batch-item-{i}"));
8758 ep.salience = 0.1 + (i as f32) * 0.15;
8761 (ep, fixture_embedding(4))
8762 })
8763 .collect();
8764 let expected_ids: Vec<MemoryId> =
8765 items.iter().map(|(e, _)| e.memory_id).collect();
8766 let expected_saliences: Vec<f32> =
8767 items.iter().map(|(e, _)| e.salience).collect();
8768
8769 actor.dispatch_remember_batch(items, Some("alice".into()), reply_tx);
8770 let ids = reply_rx.blocking_recv().unwrap().unwrap();
8771
8772 assert_eq!(ids, expected_ids, "memory_ids must preserve input order");
8774
8775 let n_episodes: i64 = actor
8777 .conn
8778 .query_row("SELECT COUNT(*) FROM episodes", [], |r| r.get(0))
8779 .unwrap();
8780 assert_eq!(n_episodes, 5, "5 episode rows expected");
8781
8782 for (id, expected) in expected_ids.iter().zip(expected_saliences.iter()) {
8784 let s: f32 = actor
8785 .conn
8786 .query_row(
8787 "SELECT salience FROM episodes WHERE memory_id = ?",
8788 params![id.to_string()],
8789 |r| r.get(0),
8790 )
8791 .unwrap();
8792 assert!(
8793 (s - expected).abs() < 1e-5,
8794 "salience round-trip mismatch: got {s}, expected {expected}",
8795 );
8796 }
8797
8798 let n_batch_audit: i64 = actor
8800 .conn
8801 .query_row(
8802 "SELECT COUNT(*) FROM audit_events \
8803 WHERE operation = 'memory.remember_batch' \
8804 AND result = 'ok'",
8805 [],
8806 |r| r.get(0),
8807 )
8808 .unwrap();
8809 assert_eq!(
8810 n_batch_audit, 1,
8811 "exactly one batch-level audit row per call (dev-log 0120 §3 Decision G)"
8812 );
8813
8814 let details: String = actor
8816 .conn
8817 .query_row(
8818 "SELECT details_json FROM audit_events \
8819 WHERE operation = 'memory.remember_batch' \
8820 ORDER BY audit_id DESC LIMIT 1",
8821 [],
8822 |r| r.get(0),
8823 )
8824 .unwrap();
8825 let v: serde_json::Value = serde_json::from_str(&details).unwrap();
8826 assert_eq!(
8827 v["item_count"], 5,
8828 "details_json.item_count must reflect the batch size"
8829 );
8830
8831 let n_pending: i64 = actor
8834 .conn
8835 .query_row("SELECT COUNT(*) FROM pending_index", [], |r| r.get(0))
8836 .unwrap();
8837 assert_eq!(
8838 n_pending, 0,
8839 "pending_index must be drained after a successful batch"
8840 );
8841 }
8842
8843 #[test]
8844 fn dispatch_remember_batch_with_no_principal_persists_null_principal() {
8845 let mut actor = build_actor();
8846 let (reply_tx, reply_rx) = oneshot::channel();
8847 let items: Vec<(Episode, Embedding)> = (0..3)
8848 .map(|i| (fixture_episode(&format!("no-principal-{i}")), fixture_embedding(4)))
8849 .collect();
8850 actor.dispatch_remember_batch(items, None, reply_tx);
8851 assert!(reply_rx.blocking_recv().unwrap().is_ok());
8852
8853 let principal: Option<String> = actor
8854 .conn
8855 .query_row(
8856 "SELECT principal_subject FROM audit_events \
8857 WHERE operation = 'memory.remember_batch' \
8858 ORDER BY audit_id DESC LIMIT 1",
8859 [],
8860 |r| r.get(0),
8861 )
8862 .unwrap();
8863 assert!(
8864 principal.is_none(),
8865 "audit row principal must be NULL when caller passed None"
8866 );
8867 }
8868
8869 #[test]
8870 fn dispatch_remember_batch_quota_exceeded_returns_forbidden() {
8871 let (conn, tmp) = open_test_db();
8872 let db_path = tmp.path().join("test.db");
8873 std::mem::forget(tmp);
8874 let hnsw: Arc<dyn solo_core::VectorIndex + Send + Sync> =
8875 Arc::new(StubVectorIndex::new(4));
8876 let (_tx, rx) = mpsc::channel(1);
8877 let mut actor = WriterActor {
8878 conn,
8879 hnsw,
8880 rx,
8881 snapshot_dir: None,
8882 embedder_id: None,
8883 embedder: None,
8884 runtime_handle: None,
8885 steward: None,
8886 steward_slot: None,
8887 triples_batch_signal: None,
8888 key: None,
8889 redactor: disabled_test_redactor(),
8890 quota_bytes: Some(10),
8893 db_path: Some(db_path),
8894 invalidate_tx: None,
8895 invalidate_tenant_id: None,
8896 };
8897
8898 let (reply_tx, reply_rx) = oneshot::channel();
8899 let items: Vec<(Episode, Embedding)> = (0..3)
8900 .map(|i| (fixture_episode(&format!("quota-batch-{i}")), fixture_embedding(4)))
8901 .collect();
8902 actor.dispatch_remember_batch(items, Some("alice".into()), reply_tx);
8903 let result = reply_rx.blocking_recv().unwrap();
8904 assert!(
8905 matches!(result, Err(solo_core::Error::Forbidden(_))),
8906 "over-quota batch must reject with Forbidden, got: {result:?}"
8907 );
8908
8909 let count: i64 = actor
8912 .conn
8913 .query_row(
8914 "SELECT COUNT(*) FROM audit_events \
8915 WHERE operation = 'memory.remember_batch' \
8916 AND result = 'forbidden'",
8917 [],
8918 |r| r.get(0),
8919 )
8920 .unwrap();
8921 assert_eq!(
8922 count, 1,
8923 "exactly one forbidden audit row for the over-quota batch"
8924 );
8925
8926 let n_episodes: i64 = actor
8928 .conn
8929 .query_row("SELECT COUNT(*) FROM episodes", [], |r| r.get(0))
8930 .unwrap();
8931 assert_eq!(
8932 n_episodes, 0,
8933 "Forbidden return must NOT leak episode rows (no BEGIN was opened)"
8934 );
8935 }
8936
8937 #[test]
8938 fn remember_batch_invokes_hnsw_add_per_item() {
8939 let (conn, _tmp) = open_test_db();
8943 let hnsw = Arc::new(StubVectorIndex::new(4));
8944 let WriterSpawn { handle, join: _ } =
8945 WriterActor::spawn(conn, hnsw.clone());
8946
8947 let items: Vec<(Episode, Embedding)> = (0..4)
8948 .map(|i| (fixture_episode(&format!("hnsw-batch-{i}")), fixture_embedding(4)))
8949 .collect();
8950
8951 let rt = tokio::runtime::Builder::new_current_thread()
8952 .enable_all()
8953 .build()
8954 .unwrap();
8955 let ids = rt
8956 .block_on(handle.remember_batch_as(Some("alice".into()), items))
8957 .unwrap();
8958 assert_eq!(ids.len(), 4);
8959
8960 std::thread::sleep(std::time::Duration::from_millis(50));
8963 drop(handle);
8964 std::thread::sleep(std::time::Duration::from_millis(50));
8965
8966 assert_eq!(
8967 hnsw.add_count(),
8968 4,
8969 "hnsw.add must run once per batched item"
8970 );
8971 }
8972 }
8973}