Skip to main content

post_cortex_memory/
services.rs

1// Copyright (c) 2025, 2026 Julius ML
2// Licensed under the MIT License. See LICENSE at the workspace root.
3
4//! Canonical [`PostCortexService`] implementation for post-cortex-memory.
5//!
6//! [`MemoryServiceImpl`] is a thin adapter over
7//! [`ConversationMemorySystem`] that fulfils the
8//! [`post_cortex_core::services::PostCortexService`] trait. Phases 6 + 7
9//! migrate the daemon's gRPC handlers and the MCP tool layer to call
10//! this trait instead of `ConversationMemorySystem` directly — at that
11//! point every read / write / search / manage operation flows through
12//! one canonical function (TODO.md:106-117).
13//!
14//! Right now this impl is **incomplete on purpose**: most methods return
15//! a `SystemError::Internal("not yet wired …")` while the legacy
16//! transport handlers continue to call `ConversationMemorySystem` paths
17//! directly. The trait surface is stable; the impl fills in
18//! incrementally as Phases 6 + 7 migrate their callers. This avoids a
19//! single massive commit that's impossible to bisect.
20
21use std::collections::HashSet;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use chrono::Utc;
26use post_cortex_core::core::context_update::{ContextUpdate, EntityRelationship, TypedEntity};
27use post_cortex_core::core::error::SystemError;
28use post_cortex_core::services::{
29    AdminRequest, AdminResponse, AssembleContextRequest, AssembleContextResponse,
30    BulkUpdateContextRequest, BulkUpdateContextResponse, HealthReport, ManageEntityRequest,
31    ManageEntityResponse, ManageSessionRequest, ManageSessionResponse, ManageWorkspaceRequest,
32    ManageWorkspaceResponse, PostCortexService, QueryContextRequest, QueryContextResponse,
33    SemanticSearchRequest, SemanticSearchResponse, StructuredSummaryRequest,
34    StructuredSummaryResponse, UpdateContextRequest, UpdateContextResponse,
35};
36use tracing::warn;
37use uuid::Uuid;
38
39use crate::memory_system::ConversationMemorySystem;
40use crate::pipeline::{
41    EmbeddingWorkItem, GraphWorkItem, Pipeline, PipelineConfig, PipelineError, SummaryWorkItem,
42};
43
44/// Canonical [`PostCortexService`] implementation backed by
45/// [`ConversationMemorySystem`].
46///
47/// Construct with [`Self::new`] passing an `Arc<ConversationMemorySystem>`
48/// (typically the singleton owned by the daemon).
49pub struct MemoryServiceImpl {
50    system: Arc<ConversationMemorySystem>,
51    pipeline: Arc<Pipeline>,
52}
53
54impl MemoryServiceImpl {
55    /// Wrap an existing memory system in the canonical service trait,
56    /// starting the non-blocking write pipeline with default capacities.
57    /// Must be called from inside a Tokio runtime.
58    #[must_use]
59    pub fn new(system: Arc<ConversationMemorySystem>) -> Self {
60        Self::with_pipeline_config(system, PipelineConfig::default())
61    }
62
63    /// Wrap an existing memory system in the service trait with an
64    /// explicit pipeline configuration.
65    #[must_use]
66    pub fn with_pipeline_config(
67        system: Arc<ConversationMemorySystem>,
68        config: PipelineConfig,
69    ) -> Self {
70        let pipeline = Arc::new(Pipeline::start(config, Arc::clone(&system)));
71        Self { system, pipeline }
72    }
73
74    /// Borrow the non-blocking pipeline. Exposed so transport adapters
75    /// can submit derived-work items directly (e.g. enqueue an
76    /// embedding compute after a manual storage write).
77    #[must_use]
78    pub fn pipeline(&self) -> &Arc<Pipeline> {
79        &self.pipeline
80    }
81
82    /// Borrow the underlying memory system. Provided so the daemon's
83    /// gRPC handlers can fall back to direct calls for methods that
84    /// have not been migrated to the trait yet.
85    #[must_use]
86    pub fn inner(&self) -> &Arc<ConversationMemorySystem> {
87        &self.system
88    }
89
90    fn not_yet_wired<T>(op: &'static str) -> Result<T, SystemError> {
91        Err(SystemError::Internal(format!(
92            "PostCortexService::{op} is not yet wired — migration lands in Phase 6 (MCP) / Phase 7 (daemon). \
93             Use ConversationMemorySystem directly until then."
94        )))
95    }
96}
97
98#[async_trait]
99impl PostCortexService for MemoryServiceImpl {
100    #[tracing::instrument(skip(self), name = "post_cortex.health")]
101    async fn health(&self) -> Result<HealthReport, SystemError> {
102        let health = self.system.get_system_health();
103        Ok(HealthReport {
104            status: if health.circuit_breaker_open {
105                "degraded".to_string()
106            } else {
107                "ok".to_string()
108            },
109            active_sessions: health.active_sessions,
110            // SystemHealth does not yet expose memory bytes — wired in
111            // Phase 11 (perf observability). For now report 0.
112            memory_usage_bytes: 0,
113            pipeline_backlog: self.pipeline.backlog(),
114            uptime_seconds: health.uptime_seconds,
115        })
116    }
117
118    #[tracing::instrument(
119        skip(self, req),
120        fields(
121            session_id = %req.session_id,
122            interaction_type = ?req.interaction_type,
123            entities = req.entities.len(),
124            relations = req.relations.len(),
125        ),
126        name = "post_cortex.update_context",
127    )]
128    async fn update_context(
129        &self,
130        req: UpdateContextRequest,
131    ) -> Result<UpdateContextResponse, SystemError> {
132        validate_update_request(&req)?;
133
134        let description = build_description(&req);
135        let context_update = build_context_update(&req);
136        let metadata =
137            serde_json::to_value(&context_update).expect("ContextUpdate serialization cannot fail");
138        let session_id = req.session_id;
139
140        let entry_id_str = self
141            .system
142            .add_incremental_update(session_id, description.clone(), Some(metadata))
143            .await
144            .map_err(SystemError::Internal)?;
145
146        let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
147            SystemError::Internal(format!(
148                "storage returned non-UUID entry id {entry_id_str:?}: {e}"
149            ))
150        })?;
151
152        // Hand derived work to the bounded pipeline. Best-effort: a full
153        // queue logs `Backpressure` but doesn't fail the durable write
154        // — the legacy in-system `tokio::spawn`s in
155        // `ConversationMemorySystem::add_incremental_update_internal`
156        // remain as the safety net until they're fully retired in 0.4.0.
157        submit_derived_work(
158            &self.pipeline,
159            session_id,
160            entry_id,
161            &description,
162            context_update,
163        );
164
165        Ok(UpdateContextResponse {
166            entry_id,
167            session_id,
168            persisted_at: Utc::now(),
169            durable: true,
170        })
171    }
172
173    #[tracing::instrument(
174        skip(self, req),
175        fields(session_id = %req.session_id, batch_size = req.updates.len()),
176        name = "post_cortex.bulk_update_context",
177    )]
178    async fn bulk_update_context(
179        &self,
180        req: BulkUpdateContextRequest,
181    ) -> Result<BulkUpdateContextResponse, SystemError> {
182        // Pre-validate the whole batch first so a bad item doesn't leave us
183        // with a half-applied write (storage-actor batching lands later;
184        // until then we still fail-fast on validation but persist
185        // sequentially).
186        for (i, item) in req.updates.iter().enumerate() {
187            if item.session_id != req.session_id {
188                return Err(SystemError::InvalidArgument(format!(
189                    "bulk_update_context: updates[{i}].session_id {} does not match request session_id {}",
190                    item.session_id, req.session_id
191                )));
192            }
193            validate_update_request(item).map_err(|e| match e {
194                SystemError::InvalidArgument(msg) => {
195                    SystemError::InvalidArgument(format!("updates[{i}]: {msg}"))
196                }
197                other => other,
198            })?;
199        }
200
201        let mut entry_ids = Vec::with_capacity(req.updates.len());
202        for (i, item) in req.updates.iter().enumerate() {
203            let description = build_description(item);
204            let context_update = build_context_update(item);
205            let metadata = serde_json::to_value(&context_update)
206                .expect("ContextUpdate serialization cannot fail");
207            let entry_id_str = self
208                .system
209                .add_incremental_update(item.session_id, description.clone(), Some(metadata))
210                .await
211                .map_err(|e| SystemError::Internal(format!("updates[{i}]: {e}")))?;
212            let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
213                SystemError::Internal(format!(
214                    "updates[{i}]: storage returned non-UUID entry id {entry_id_str:?}: {e}"
215                ))
216            })?;
217            entry_ids.push(entry_id);
218            submit_derived_work(
219                &self.pipeline,
220                item.session_id,
221                entry_id,
222                &description,
223                context_update,
224            );
225        }
226
227        Ok(BulkUpdateContextResponse {
228            entry_ids,
229            persisted_at: Utc::now(),
230            durable: true,
231        })
232    }
233
234    #[tracing::instrument(skip(self, _req), name = "post_cortex.semantic_search")]
235    async fn semantic_search(
236        &self,
237        _req: SemanticSearchRequest,
238    ) -> Result<SemanticSearchResponse, SystemError> {
239        Self::not_yet_wired("semantic_search")
240    }
241
242    #[tracing::instrument(skip(self, _req), name = "post_cortex.query_context")]
243    async fn query_context(
244        &self,
245        _req: QueryContextRequest,
246    ) -> Result<QueryContextResponse, SystemError> {
247        Self::not_yet_wired("query_context")
248    }
249
250    #[tracing::instrument(skip(self, _req), name = "post_cortex.assemble_context")]
251    async fn assemble_context(
252        &self,
253        _req: AssembleContextRequest,
254    ) -> Result<AssembleContextResponse, SystemError> {
255        Self::not_yet_wired("assemble_context")
256    }
257
258    #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_session")]
259    async fn manage_session(
260        &self,
261        _req: ManageSessionRequest,
262    ) -> Result<ManageSessionResponse, SystemError> {
263        Self::not_yet_wired("manage_session")
264    }
265
266    #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_workspace")]
267    async fn manage_workspace(
268        &self,
269        _req: ManageWorkspaceRequest,
270    ) -> Result<ManageWorkspaceResponse, SystemError> {
271        Self::not_yet_wired("manage_workspace")
272    }
273
274    #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_entity")]
275    async fn manage_entity(
276        &self,
277        _req: ManageEntityRequest,
278    ) -> Result<ManageEntityResponse, SystemError> {
279        Self::not_yet_wired("manage_entity")
280    }
281
282    #[tracing::instrument(skip(self, _req), name = "post_cortex.get_structured_summary")]
283    async fn get_structured_summary(
284        &self,
285        _req: StructuredSummaryRequest,
286    ) -> Result<StructuredSummaryResponse, SystemError> {
287        Self::not_yet_wired("get_structured_summary")
288    }
289
290    #[tracing::instrument(skip(self, _req), name = "post_cortex.admin")]
291    async fn admin(&self, _req: AdminRequest) -> Result<AdminResponse, SystemError> {
292        Self::not_yet_wired("admin")
293    }
294}
295
296// ---------------------------------------------------------------------------
297// Canonical helpers — single source of truth for update_context validation
298// and metadata shaping. Every transport (gRPC, MCP, REST) reaches the same
299// behaviour by going through `update_context` / `bulk_update_context`; the
300// helpers below are intentionally private so transports cannot bypass them.
301// ---------------------------------------------------------------------------
302
303/// Strict validation lifted from the legacy gRPC helper. Rejects any input
304/// that would corrupt the entity graph (empty title+description, missing
305/// entities/relations, dangling relation endpoints, self-relations, empty
306/// relation context).
307fn validate_update_request(req: &UpdateContextRequest) -> Result<(), SystemError> {
308    if req.content.title.trim().is_empty() && req.content.description.trim().is_empty() {
309        return Err(SystemError::InvalidArgument(
310            "update_context: title and description are both empty — provide at least one".into(),
311        ));
312    }
313    if req.entities.is_empty() {
314        return Err(SystemError::InvalidArgument(
315            "update_context: entities must not be empty".into(),
316        ));
317    }
318    if req.relations.is_empty() {
319        return Err(SystemError::InvalidArgument(
320            "update_context: relations must not be empty".into(),
321        ));
322    }
323
324    let entity_names: HashSet<&str> = req.entities.iter().map(|e| e.name.as_str()).collect();
325    for (i, rel) in req.relations.iter().enumerate() {
326        if rel.from_entity.is_empty() {
327            return Err(SystemError::InvalidArgument(format!(
328                "relation[{i}]: from_entity must not be empty"
329            )));
330        }
331        if rel.to_entity.is_empty() {
332            return Err(SystemError::InvalidArgument(format!(
333                "relation[{i}]: to_entity must not be empty"
334            )));
335        }
336        if rel.from_entity == rel.to_entity {
337            return Err(SystemError::InvalidArgument(format!(
338                "relation[{i}]: self-relations are not allowed (from_entity == to_entity == {:?})",
339                rel.from_entity
340            )));
341        }
342        if rel.context.trim().is_empty() {
343            return Err(SystemError::InvalidArgument(format!(
344                "relation[{i}]: context must not be empty — every relation requires an explanation"
345            )));
346        }
347        if !entity_names.contains(rel.from_entity.as_str()) {
348            return Err(SystemError::InvalidArgument(format!(
349                "relation[{i}]: from_entity {:?} is not declared in the entities list",
350                rel.from_entity
351            )));
352        }
353        if !entity_names.contains(rel.to_entity.as_str()) {
354            return Err(SystemError::InvalidArgument(format!(
355                "relation[{i}]: to_entity {:?} is not declared in the entities list",
356                rel.to_entity
357            )));
358        }
359    }
360
361    Ok(())
362}
363
364/// Build the text fed to the vectorizer — `title\n description` matches what
365/// the MCP path settled on for cross-session search quality.
366fn build_description(req: &UpdateContextRequest) -> String {
367    if req.content.description.is_empty() {
368        req.content.title.clone()
369    } else if req.content.title.is_empty() {
370        req.content.description.clone()
371    } else {
372        format!("{}\n{}", req.content.title, req.content.description)
373    }
374}
375
376/// Build the `ContextUpdate` JSON blob stored as the storage entry's metadata.
377/// Keeps `creates_entities` (names) and `typed_entities` (name + type) in sync
378/// — both are consumed downstream by the entity graph builder.
379fn build_context_update(req: &UpdateContextRequest) -> ContextUpdate {
380    let typed_entities: Vec<TypedEntity> = req
381        .entities
382        .iter()
383        .map(|e| TypedEntity {
384            name: e.name.clone(),
385            entity_type: e.entity_type.clone(),
386        })
387        .collect();
388    let creates_entities: Vec<String> = req.entities.iter().map(|e| e.name.clone()).collect();
389    let creates_relationships: Vec<EntityRelationship> = req.relations.clone();
390    let related_code = req.code_reference.clone();
391
392    ContextUpdate {
393        id: Uuid::new_v4(),
394        timestamp: Utc::now(),
395        update_type: req.interaction_type.clone(),
396        content: req.content.clone(),
397        related_code,
398        parent_update: None,
399        user_marked_important: false,
400        creates_entities,
401        creates_relationships,
402        references_entities: Vec::new(),
403        typed_entities,
404    }
405}
406
407/// Hand a freshly-persisted update off to the bounded background pipeline:
408/// embedding compute, entity-graph merge, and summary refresh all run on
409/// separate worker tasks so the write path returns as soon as the entry
410/// is durably stored. Backpressure on any queue is logged but swallowed
411/// — the legacy in-system `tokio::spawn`s in
412/// [`ConversationMemorySystem::add_incremental_update_internal`] still
413/// run as a safety net, so a full queue degrades to "do the work on a
414/// raw spawn anyway" rather than dropping it.
415fn submit_derived_work(
416    pipeline: &Pipeline,
417    session_id: Uuid,
418    entry_id: Uuid,
419    text: &str,
420    update: ContextUpdate,
421) {
422    if let Err(e) = pipeline.submit_embedding(EmbeddingWorkItem {
423        session_id,
424        entry_id,
425        text: text.to_string(),
426    }) {
427        log_pipeline_submit("embedding", session_id, entry_id, e);
428    }
429    if let Err(e) = pipeline.submit_graph(GraphWorkItem::ApplyUpdate { session_id, update }) {
430        log_pipeline_submit("graph", session_id, entry_id, e);
431    }
432    if let Err(e) = pipeline.submit_summary(SummaryWorkItem { session_id }) {
433        log_pipeline_submit("summary", session_id, entry_id, e);
434    }
435}
436
437fn log_pipeline_submit(queue: &str, session_id: Uuid, entry_id: Uuid, err: PipelineError) {
438    warn!(
439        queue,
440        %session_id,
441        %entry_id,
442        error = %err,
443        "pipeline submission failed (non-fatal — legacy in-system spawn covers the work)"
444    );
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::memory_system::SystemConfig;
451    use chrono::Utc;
452    use post_cortex_core::core::context_update::{
453        EntityData, EntityType, RelationType, UpdateContent, UpdateType,
454    };
455
456    fn entity(name: &str) -> EntityData {
457        EntityData {
458            name: name.to_string(),
459            entity_type: EntityType::Concept,
460            first_mentioned: Utc::now(),
461            last_mentioned: Utc::now(),
462            mention_count: 1,
463            importance_score: 1.0,
464            description: None,
465        }
466    }
467
468    fn relation(from: &str, to: &str) -> EntityRelationship {
469        EntityRelationship {
470            from_entity: from.to_string(),
471            to_entity: to.to_string(),
472            relation_type: RelationType::RelatedTo,
473            context: "test relation".to_string(),
474        }
475    }
476
477    fn good_request() -> UpdateContextRequest {
478        UpdateContextRequest {
479            session_id: Uuid::new_v4(),
480            interaction_type: UpdateType::ConceptDefined,
481            content: UpdateContent {
482                title: "Some concept".into(),
483                description: "A short definition".into(),
484                details: vec![],
485                examples: vec![],
486                implications: vec![],
487            },
488            entities: vec![entity("Foo"), entity("Bar")],
489            relations: vec![relation("Foo", "Bar")],
490            code_reference: None,
491        }
492    }
493
494    #[tokio::test]
495    async fn trait_is_object_safe() {
496        fn _accept_dyn(_svc: Arc<dyn PostCortexService>) {}
497    }
498
499    #[test]
500    fn validation_rejects_empty_title_and_description() {
501        let mut req = good_request();
502        req.content.title = String::new();
503        req.content.description = String::new();
504        let err = validate_update_request(&req).unwrap_err();
505        assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("both empty")));
506    }
507
508    #[test]
509    fn validation_rejects_empty_entities() {
510        let mut req = good_request();
511        req.entities = vec![];
512        assert!(matches!(
513            validate_update_request(&req),
514            Err(SystemError::InvalidArgument(_))
515        ));
516    }
517
518    #[test]
519    fn validation_rejects_empty_relations() {
520        let mut req = good_request();
521        req.relations = vec![];
522        assert!(matches!(
523            validate_update_request(&req),
524            Err(SystemError::InvalidArgument(_))
525        ));
526    }
527
528    #[test]
529    fn validation_rejects_self_relation() {
530        let mut req = good_request();
531        req.relations = vec![relation("Foo", "Foo")];
532        assert!(matches!(
533            validate_update_request(&req),
534            Err(SystemError::InvalidArgument(ref m)) if m.contains("self-relations")
535        ));
536    }
537
538    #[test]
539    fn validation_rejects_dangling_relation_endpoint() {
540        let mut req = good_request();
541        req.relations = vec![relation("Foo", "Ghost")];
542        assert!(matches!(
543            validate_update_request(&req),
544            Err(SystemError::InvalidArgument(ref m)) if m.contains("Ghost")
545        ));
546    }
547
548    #[test]
549    fn validation_rejects_empty_relation_context() {
550        let mut req = good_request();
551        req.relations[0].context = "   ".into();
552        assert!(matches!(
553            validate_update_request(&req),
554            Err(SystemError::InvalidArgument(ref m)) if m.contains("context must not be empty")
555        ));
556    }
557
558    #[test]
559    fn validation_accepts_good_request() {
560        assert!(validate_update_request(&good_request()).is_ok());
561    }
562
563    #[test]
564    fn description_joins_title_and_body() {
565        let req = good_request();
566        assert_eq!(build_description(&req), "Some concept\nA short definition");
567    }
568
569    #[test]
570    fn description_falls_back_when_one_side_empty() {
571        let mut req = good_request();
572        req.content.description = String::new();
573        assert_eq!(build_description(&req), "Some concept");
574
575        req.content.title = String::new();
576        req.content.description = "Body only".into();
577        assert_eq!(build_description(&req), "Body only");
578    }
579
580    #[test]
581    fn metadata_keeps_creates_entities_in_sync_with_typed_entities() {
582        let req = good_request();
583        let update = build_context_update(&req);
584        let meta = serde_json::to_value(&update).unwrap();
585        let names = meta["creates_entities"].as_array().unwrap();
586        let typed = meta["typed_entities"].as_array().unwrap();
587        assert_eq!(names.len(), typed.len());
588        assert_eq!(names.len(), req.entities.len());
589        for (i, e) in req.entities.iter().enumerate() {
590            assert_eq!(names[i].as_str().unwrap(), e.name);
591            assert_eq!(typed[i]["name"].as_str().unwrap(), e.name);
592        }
593    }
594
595    async fn make_service(suffix: &str) -> (MemoryServiceImpl, String) {
596        let test_dir = format!(
597            "./test_data_memservice_{}_{}",
598            suffix,
599            std::time::SystemTime::now()
600                .duration_since(std::time::UNIX_EPOCH)
601                .unwrap()
602                .as_nanos()
603        );
604        std::fs::create_dir_all(&test_dir).unwrap();
605        let config = SystemConfig {
606            data_directory: test_dir.clone(),
607            ..Default::default()
608        };
609        let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
610        (MemoryServiceImpl::new(system), test_dir)
611    }
612
613    #[tokio::test]
614    async fn health_returns_ok_status() {
615        let (svc, test_dir) = make_service("health").await;
616        let report = svc.health().await.unwrap();
617        assert!(report.status == "ok" || report.status == "degraded");
618        std::fs::remove_dir_all(&test_dir).unwrap();
619    }
620
621    #[tokio::test]
622    async fn update_context_persists_and_returns_entry_id() {
623        let (svc, test_dir) = make_service("update").await;
624        let session_id = svc.inner().create_session(None, None).await.unwrap();
625
626        let mut req = good_request();
627        req.session_id = session_id;
628        let resp = svc.update_context(req).await.unwrap();
629
630        assert_eq!(resp.session_id, session_id);
631        assert!(resp.durable);
632        std::fs::remove_dir_all(&test_dir).unwrap();
633    }
634
635    #[tokio::test]
636    async fn update_context_rejects_invalid_input_with_invalid_argument() {
637        let (svc, test_dir) = make_service("invalid").await;
638        let session_id = svc.inner().create_session(None, None).await.unwrap();
639
640        let mut req = good_request();
641        req.session_id = session_id;
642        req.entities = vec![]; // violates "entities must not be empty"
643        let err = svc.update_context(req).await.unwrap_err();
644        assert!(matches!(err, SystemError::InvalidArgument(_)));
645        std::fs::remove_dir_all(&test_dir).unwrap();
646    }
647
648    #[tokio::test]
649    async fn bulk_update_context_persists_every_item() {
650        let (svc, test_dir) = make_service("bulk").await;
651        let session_id = svc.inner().create_session(None, None).await.unwrap();
652
653        let mut a = good_request();
654        a.session_id = session_id;
655        a.content.title = "First".into();
656        let mut b = good_request();
657        b.session_id = session_id;
658        b.content.title = "Second".into();
659
660        let resp = svc
661            .bulk_update_context(BulkUpdateContextRequest {
662                session_id,
663                updates: vec![a, b],
664            })
665            .await
666            .unwrap();
667        assert_eq!(resp.entry_ids.len(), 2);
668        assert!(resp.durable);
669        std::fs::remove_dir_all(&test_dir).unwrap();
670    }
671
672    #[tokio::test]
673    async fn update_context_returns_fast_then_pipeline_drains_in_background() {
674        // The hot path persists durably; embedding + graph + summary land
675        // on background workers. Verify that, on a warm path
676        // (post-vectorizer-init), update_context returns quickly and the
677        // pipeline backlog drains shortly after.
678        //
679        // The first call still pays the model-download cost via the
680        // legacy in-system `spawn_background_vectorization` (kept as a
681        // safety net while direct callers migrate to the pipeline path).
682        // Phase 7+ removes that path entirely.
683        let (svc, test_dir) = make_service("nonblocking").await;
684        let session_id = svc.inner().create_session(None, None).await.unwrap();
685
686        // Warm-up call — absorbs first-time model load.
687        let mut warmup = good_request();
688        warmup.session_id = session_id;
689        warmup.content.title = "warmup".into();
690        let _ = svc.update_context(warmup).await.unwrap();
691
692        // Wait for any pending background work to settle.
693        for _ in 0..100 {
694            if svc.pipeline().backlog() == 0 {
695                break;
696            }
697            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
698        }
699
700        // Measured call — should be fast.
701        let mut req = good_request();
702        req.session_id = session_id;
703        let start = std::time::Instant::now();
704        let resp = svc.update_context(req).await.unwrap();
705        let write_latency = start.elapsed();
706
707        assert!(resp.durable);
708        assert!(
709            write_latency.as_millis() < 250,
710            "update_context took {write_latency:?} on warm path — should be <250ms"
711        );
712
713        // Workers drain the queues asynchronously.
714        for _ in 0..100 {
715            if svc.pipeline().backlog() == 0 {
716                break;
717            }
718            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
719        }
720        assert_eq!(
721            svc.pipeline().backlog(),
722            0,
723            "pipeline backlog should drain within 2s"
724        );
725
726        std::fs::remove_dir_all(&test_dir).unwrap();
727    }
728
729    #[tokio::test]
730    async fn bulk_update_context_rejects_mismatched_session() {
731        let (svc, test_dir) = make_service("bulkmis").await;
732        let session_id = svc.inner().create_session(None, None).await.unwrap();
733
734        let mut item = good_request();
735        item.session_id = Uuid::new_v4(); // intentionally different
736        let err = svc
737            .bulk_update_context(BulkUpdateContextRequest {
738                session_id,
739                updates: vec![item],
740            })
741            .await
742            .unwrap_err();
743        assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("does not match")));
744        std::fs::remove_dir_all(&test_dir).unwrap();
745    }
746}