1use std::collections::HashSet;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use chrono::Utc;
26use post_cortex_core::core::context_update::{ContextUpdate, EntityRelationship, TypedEntity};
27use post_cortex_core::core::error::SystemError;
28use post_cortex_core::services::{
29 AdminRequest, AdminResponse, AssembleContextRequest, AssembleContextResponse,
30 BulkUpdateContextRequest, BulkUpdateContextResponse, HealthReport, ManageEntityRequest,
31 ManageEntityResponse, ManageSessionRequest, ManageSessionResponse, ManageWorkspaceRequest,
32 ManageWorkspaceResponse, PostCortexService, QueryContextRequest, QueryContextResponse,
33 SemanticSearchRequest, SemanticSearchResponse, StructuredSummaryRequest,
34 StructuredSummaryResponse, UpdateContextRequest, UpdateContextResponse,
35};
36use tracing::warn;
37use uuid::Uuid;
38
39use crate::memory_system::ConversationMemorySystem;
40use crate::pipeline::{
41 EmbeddingWorkItem, GraphWorkItem, Pipeline, PipelineConfig, PipelineError, SummaryWorkItem,
42};
43
44pub struct MemoryServiceImpl {
50 system: Arc<ConversationMemorySystem>,
51 pipeline: Arc<Pipeline>,
52}
53
54impl MemoryServiceImpl {
55 #[must_use]
59 pub fn new(system: Arc<ConversationMemorySystem>) -> Self {
60 Self::with_pipeline_config(system, PipelineConfig::default())
61 }
62
63 #[must_use]
66 pub fn with_pipeline_config(
67 system: Arc<ConversationMemorySystem>,
68 config: PipelineConfig,
69 ) -> Self {
70 let pipeline = Arc::new(Pipeline::start(config, Arc::clone(&system)));
71 Self { system, pipeline }
72 }
73
74 #[must_use]
78 pub fn pipeline(&self) -> &Arc<Pipeline> {
79 &self.pipeline
80 }
81
82 #[must_use]
86 pub fn inner(&self) -> &Arc<ConversationMemorySystem> {
87 &self.system
88 }
89
90 fn not_yet_wired<T>(op: &'static str) -> Result<T, SystemError> {
91 Err(SystemError::Internal(format!(
92 "PostCortexService::{op} is not yet wired — migration lands in Phase 6 (MCP) / Phase 7 (daemon). \
93 Use ConversationMemorySystem directly until then."
94 )))
95 }
96}
97
98#[async_trait]
99impl PostCortexService for MemoryServiceImpl {
100 #[tracing::instrument(skip(self), name = "post_cortex.health")]
101 async fn health(&self) -> Result<HealthReport, SystemError> {
102 let health = self.system.get_system_health();
103 Ok(HealthReport {
104 status: if health.circuit_breaker_open {
105 "degraded".to_string()
106 } else {
107 "ok".to_string()
108 },
109 active_sessions: health.active_sessions,
110 memory_usage_bytes: 0,
113 pipeline_backlog: self.pipeline.backlog(),
114 uptime_seconds: health.uptime_seconds,
115 })
116 }
117
118 #[tracing::instrument(
119 skip(self, req),
120 fields(
121 session_id = %req.session_id,
122 interaction_type = ?req.interaction_type,
123 entities = req.entities.len(),
124 relations = req.relations.len(),
125 ),
126 name = "post_cortex.update_context",
127 )]
128 async fn update_context(
129 &self,
130 req: UpdateContextRequest,
131 ) -> Result<UpdateContextResponse, SystemError> {
132 validate_update_request(&req)?;
133
134 let description = build_description(&req);
135 let context_update = build_context_update(&req);
136 let metadata = serde_json::to_value(&context_update)
137 .expect("ContextUpdate serialization cannot fail");
138 let session_id = req.session_id;
139
140 let entry_id_str = self
141 .system
142 .add_incremental_update(session_id, description.clone(), Some(metadata))
143 .await
144 .map_err(SystemError::Internal)?;
145
146 let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
147 SystemError::Internal(format!(
148 "storage returned non-UUID entry id {entry_id_str:?}: {e}"
149 ))
150 })?;
151
152 submit_derived_work(
158 &self.pipeline,
159 session_id,
160 entry_id,
161 &description,
162 context_update,
163 );
164
165 Ok(UpdateContextResponse {
166 entry_id,
167 session_id,
168 persisted_at: Utc::now(),
169 durable: true,
170 })
171 }
172
173 #[tracing::instrument(
174 skip(self, req),
175 fields(session_id = %req.session_id, batch_size = req.updates.len()),
176 name = "post_cortex.bulk_update_context",
177 )]
178 async fn bulk_update_context(
179 &self,
180 req: BulkUpdateContextRequest,
181 ) -> Result<BulkUpdateContextResponse, SystemError> {
182 for (i, item) in req.updates.iter().enumerate() {
187 if item.session_id != req.session_id {
188 return Err(SystemError::InvalidArgument(format!(
189 "bulk_update_context: updates[{i}].session_id {} does not match request session_id {}",
190 item.session_id, req.session_id
191 )));
192 }
193 validate_update_request(item).map_err(|e| match e {
194 SystemError::InvalidArgument(msg) => {
195 SystemError::InvalidArgument(format!("updates[{i}]: {msg}"))
196 }
197 other => other,
198 })?;
199 }
200
201 let mut entry_ids = Vec::with_capacity(req.updates.len());
202 for (i, item) in req.updates.iter().enumerate() {
203 let description = build_description(item);
204 let context_update = build_context_update(item);
205 let metadata = serde_json::to_value(&context_update)
206 .expect("ContextUpdate serialization cannot fail");
207 let entry_id_str = self
208 .system
209 .add_incremental_update(item.session_id, description.clone(), Some(metadata))
210 .await
211 .map_err(|e| SystemError::Internal(format!("updates[{i}]: {e}")))?;
212 let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
213 SystemError::Internal(format!(
214 "updates[{i}]: storage returned non-UUID entry id {entry_id_str:?}: {e}"
215 ))
216 })?;
217 entry_ids.push(entry_id);
218 submit_derived_work(
219 &self.pipeline,
220 item.session_id,
221 entry_id,
222 &description,
223 context_update,
224 );
225 }
226
227 Ok(BulkUpdateContextResponse {
228 entry_ids,
229 persisted_at: Utc::now(),
230 durable: true,
231 })
232 }
233
234 #[tracing::instrument(skip(self, _req), name = "post_cortex.semantic_search")]
235 async fn semantic_search(
236 &self,
237 _req: SemanticSearchRequest,
238 ) -> Result<SemanticSearchResponse, SystemError> {
239 Self::not_yet_wired("semantic_search")
240 }
241
242 #[tracing::instrument(skip(self, _req), name = "post_cortex.query_context")]
243 async fn query_context(
244 &self,
245 _req: QueryContextRequest,
246 ) -> Result<QueryContextResponse, SystemError> {
247 Self::not_yet_wired("query_context")
248 }
249
250 #[tracing::instrument(skip(self, _req), name = "post_cortex.assemble_context")]
251 async fn assemble_context(
252 &self,
253 _req: AssembleContextRequest,
254 ) -> Result<AssembleContextResponse, SystemError> {
255 Self::not_yet_wired("assemble_context")
256 }
257
258 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_session")]
259 async fn manage_session(
260 &self,
261 _req: ManageSessionRequest,
262 ) -> Result<ManageSessionResponse, SystemError> {
263 Self::not_yet_wired("manage_session")
264 }
265
266 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_workspace")]
267 async fn manage_workspace(
268 &self,
269 _req: ManageWorkspaceRequest,
270 ) -> Result<ManageWorkspaceResponse, SystemError> {
271 Self::not_yet_wired("manage_workspace")
272 }
273
274 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_entity")]
275 async fn manage_entity(
276 &self,
277 _req: ManageEntityRequest,
278 ) -> Result<ManageEntityResponse, SystemError> {
279 Self::not_yet_wired("manage_entity")
280 }
281
282 #[tracing::instrument(skip(self, _req), name = "post_cortex.get_structured_summary")]
283 async fn get_structured_summary(
284 &self,
285 _req: StructuredSummaryRequest,
286 ) -> Result<StructuredSummaryResponse, SystemError> {
287 Self::not_yet_wired("get_structured_summary")
288 }
289
290 #[tracing::instrument(skip(self, _req), name = "post_cortex.admin")]
291 async fn admin(&self, _req: AdminRequest) -> Result<AdminResponse, SystemError> {
292 Self::not_yet_wired("admin")
293 }
294}
295
296fn validate_update_request(req: &UpdateContextRequest) -> Result<(), SystemError> {
308 if req.content.title.trim().is_empty() && req.content.description.trim().is_empty() {
309 return Err(SystemError::InvalidArgument(
310 "update_context: title and description are both empty — provide at least one".into(),
311 ));
312 }
313 if req.entities.is_empty() {
314 return Err(SystemError::InvalidArgument(
315 "update_context: entities must not be empty".into(),
316 ));
317 }
318 if req.relations.is_empty() {
319 return Err(SystemError::InvalidArgument(
320 "update_context: relations must not be empty".into(),
321 ));
322 }
323
324 let entity_names: HashSet<&str> = req.entities.iter().map(|e| e.name.as_str()).collect();
325 for (i, rel) in req.relations.iter().enumerate() {
326 if rel.from_entity.is_empty() {
327 return Err(SystemError::InvalidArgument(format!(
328 "relation[{i}]: from_entity must not be empty"
329 )));
330 }
331 if rel.to_entity.is_empty() {
332 return Err(SystemError::InvalidArgument(format!(
333 "relation[{i}]: to_entity must not be empty"
334 )));
335 }
336 if rel.from_entity == rel.to_entity {
337 return Err(SystemError::InvalidArgument(format!(
338 "relation[{i}]: self-relations are not allowed (from_entity == to_entity == {:?})",
339 rel.from_entity
340 )));
341 }
342 if rel.context.trim().is_empty() {
343 return Err(SystemError::InvalidArgument(format!(
344 "relation[{i}]: context must not be empty — every relation requires an explanation"
345 )));
346 }
347 if !entity_names.contains(rel.from_entity.as_str()) {
348 return Err(SystemError::InvalidArgument(format!(
349 "relation[{i}]: from_entity {:?} is not declared in the entities list",
350 rel.from_entity
351 )));
352 }
353 if !entity_names.contains(rel.to_entity.as_str()) {
354 return Err(SystemError::InvalidArgument(format!(
355 "relation[{i}]: to_entity {:?} is not declared in the entities list",
356 rel.to_entity
357 )));
358 }
359 }
360
361 Ok(())
362}
363
364fn build_description(req: &UpdateContextRequest) -> String {
367 if req.content.description.is_empty() {
368 req.content.title.clone()
369 } else if req.content.title.is_empty() {
370 req.content.description.clone()
371 } else {
372 format!("{}\n{}", req.content.title, req.content.description)
373 }
374}
375
376fn build_context_update(req: &UpdateContextRequest) -> ContextUpdate {
380 let typed_entities: Vec<TypedEntity> = req
381 .entities
382 .iter()
383 .map(|e| TypedEntity {
384 name: e.name.clone(),
385 entity_type: e.entity_type.clone(),
386 })
387 .collect();
388 let creates_entities: Vec<String> = req.entities.iter().map(|e| e.name.clone()).collect();
389 let creates_relationships: Vec<EntityRelationship> = req.relations.clone();
390 let related_code = req.code_reference.clone();
391
392 ContextUpdate {
393 id: Uuid::new_v4(),
394 timestamp: Utc::now(),
395 update_type: req.interaction_type.clone(),
396 content: req.content.clone(),
397 related_code,
398 parent_update: None,
399 user_marked_important: false,
400 creates_entities,
401 creates_relationships,
402 references_entities: Vec::new(),
403 typed_entities,
404 }
405}
406
407fn submit_derived_work(
416 pipeline: &Pipeline,
417 session_id: Uuid,
418 entry_id: Uuid,
419 text: &str,
420 update: ContextUpdate,
421) {
422 if let Err(e) = pipeline.submit_embedding(EmbeddingWorkItem {
423 session_id,
424 entry_id,
425 text: text.to_string(),
426 }) {
427 log_pipeline_submit("embedding", session_id, entry_id, e);
428 }
429 if let Err(e) = pipeline.submit_graph(GraphWorkItem::ApplyUpdate {
430 session_id,
431 update,
432 }) {
433 log_pipeline_submit("graph", session_id, entry_id, e);
434 }
435 if let Err(e) = pipeline.submit_summary(SummaryWorkItem { session_id }) {
436 log_pipeline_submit("summary", session_id, entry_id, e);
437 }
438}
439
440fn log_pipeline_submit(queue: &str, session_id: Uuid, entry_id: Uuid, err: PipelineError) {
441 warn!(
442 queue,
443 %session_id,
444 %entry_id,
445 error = %err,
446 "pipeline submission failed (non-fatal — legacy in-system spawn covers the work)"
447 );
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::memory_system::SystemConfig;
454 use chrono::Utc;
455 use post_cortex_core::core::context_update::{
456 EntityData, EntityType, RelationType, UpdateContent, UpdateType,
457 };
458
459 fn entity(name: &str) -> EntityData {
460 EntityData {
461 name: name.to_string(),
462 entity_type: EntityType::Concept,
463 first_mentioned: Utc::now(),
464 last_mentioned: Utc::now(),
465 mention_count: 1,
466 importance_score: 1.0,
467 description: None,
468 }
469 }
470
471 fn relation(from: &str, to: &str) -> EntityRelationship {
472 EntityRelationship {
473 from_entity: from.to_string(),
474 to_entity: to.to_string(),
475 relation_type: RelationType::RelatedTo,
476 context: "test relation".to_string(),
477 }
478 }
479
480 fn good_request() -> UpdateContextRequest {
481 UpdateContextRequest {
482 session_id: Uuid::new_v4(),
483 interaction_type: UpdateType::ConceptDefined,
484 content: UpdateContent {
485 title: "Some concept".into(),
486 description: "A short definition".into(),
487 details: vec![],
488 examples: vec![],
489 implications: vec![],
490 },
491 entities: vec![entity("Foo"), entity("Bar")],
492 relations: vec![relation("Foo", "Bar")],
493 code_reference: None,
494 }
495 }
496
497 #[tokio::test]
498 async fn trait_is_object_safe() {
499 fn _accept_dyn(_svc: Arc<dyn PostCortexService>) {}
500 }
501
502 #[test]
503 fn validation_rejects_empty_title_and_description() {
504 let mut req = good_request();
505 req.content.title = String::new();
506 req.content.description = String::new();
507 let err = validate_update_request(&req).unwrap_err();
508 assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("both empty")));
509 }
510
511 #[test]
512 fn validation_rejects_empty_entities() {
513 let mut req = good_request();
514 req.entities = vec![];
515 assert!(matches!(
516 validate_update_request(&req),
517 Err(SystemError::InvalidArgument(_))
518 ));
519 }
520
521 #[test]
522 fn validation_rejects_empty_relations() {
523 let mut req = good_request();
524 req.relations = vec![];
525 assert!(matches!(
526 validate_update_request(&req),
527 Err(SystemError::InvalidArgument(_))
528 ));
529 }
530
531 #[test]
532 fn validation_rejects_self_relation() {
533 let mut req = good_request();
534 req.relations = vec![relation("Foo", "Foo")];
535 assert!(matches!(
536 validate_update_request(&req),
537 Err(SystemError::InvalidArgument(ref m)) if m.contains("self-relations")
538 ));
539 }
540
541 #[test]
542 fn validation_rejects_dangling_relation_endpoint() {
543 let mut req = good_request();
544 req.relations = vec![relation("Foo", "Ghost")];
545 assert!(matches!(
546 validate_update_request(&req),
547 Err(SystemError::InvalidArgument(ref m)) if m.contains("Ghost")
548 ));
549 }
550
551 #[test]
552 fn validation_rejects_empty_relation_context() {
553 let mut req = good_request();
554 req.relations[0].context = " ".into();
555 assert!(matches!(
556 validate_update_request(&req),
557 Err(SystemError::InvalidArgument(ref m)) if m.contains("context must not be empty")
558 ));
559 }
560
561 #[test]
562 fn validation_accepts_good_request() {
563 assert!(validate_update_request(&good_request()).is_ok());
564 }
565
566 #[test]
567 fn description_joins_title_and_body() {
568 let req = good_request();
569 assert_eq!(build_description(&req), "Some concept\nA short definition");
570 }
571
572 #[test]
573 fn description_falls_back_when_one_side_empty() {
574 let mut req = good_request();
575 req.content.description = String::new();
576 assert_eq!(build_description(&req), "Some concept");
577
578 req.content.title = String::new();
579 req.content.description = "Body only".into();
580 assert_eq!(build_description(&req), "Body only");
581 }
582
583 #[test]
584 fn metadata_keeps_creates_entities_in_sync_with_typed_entities() {
585 let req = good_request();
586 let update = build_context_update(&req);
587 let meta = serde_json::to_value(&update).unwrap();
588 let names = meta["creates_entities"].as_array().unwrap();
589 let typed = meta["typed_entities"].as_array().unwrap();
590 assert_eq!(names.len(), typed.len());
591 assert_eq!(names.len(), req.entities.len());
592 for (i, e) in req.entities.iter().enumerate() {
593 assert_eq!(names[i].as_str().unwrap(), e.name);
594 assert_eq!(typed[i]["name"].as_str().unwrap(), e.name);
595 }
596 }
597
598 async fn make_service(suffix: &str) -> (MemoryServiceImpl, String) {
599 let test_dir = format!(
600 "./test_data_memservice_{}_{}",
601 suffix,
602 std::time::SystemTime::now()
603 .duration_since(std::time::UNIX_EPOCH)
604 .unwrap()
605 .as_nanos()
606 );
607 std::fs::create_dir_all(&test_dir).unwrap();
608 let config = SystemConfig {
609 data_directory: test_dir.clone(),
610 ..Default::default()
611 };
612 let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
613 (MemoryServiceImpl::new(system), test_dir)
614 }
615
616 #[tokio::test]
617 async fn health_returns_ok_status() {
618 let (svc, test_dir) = make_service("health").await;
619 let report = svc.health().await.unwrap();
620 assert!(report.status == "ok" || report.status == "degraded");
621 std::fs::remove_dir_all(&test_dir).unwrap();
622 }
623
624 #[tokio::test]
625 async fn update_context_persists_and_returns_entry_id() {
626 let (svc, test_dir) = make_service("update").await;
627 let session_id = svc.inner().create_session(None, None).await.unwrap();
628
629 let mut req = good_request();
630 req.session_id = session_id;
631 let resp = svc.update_context(req).await.unwrap();
632
633 assert_eq!(resp.session_id, session_id);
634 assert!(resp.durable);
635 std::fs::remove_dir_all(&test_dir).unwrap();
636 }
637
638 #[tokio::test]
639 async fn update_context_rejects_invalid_input_with_invalid_argument() {
640 let (svc, test_dir) = make_service("invalid").await;
641 let session_id = svc.inner().create_session(None, None).await.unwrap();
642
643 let mut req = good_request();
644 req.session_id = session_id;
645 req.entities = vec![]; let err = svc.update_context(req).await.unwrap_err();
647 assert!(matches!(err, SystemError::InvalidArgument(_)));
648 std::fs::remove_dir_all(&test_dir).unwrap();
649 }
650
651 #[tokio::test]
652 async fn bulk_update_context_persists_every_item() {
653 let (svc, test_dir) = make_service("bulk").await;
654 let session_id = svc.inner().create_session(None, None).await.unwrap();
655
656 let mut a = good_request();
657 a.session_id = session_id;
658 a.content.title = "First".into();
659 let mut b = good_request();
660 b.session_id = session_id;
661 b.content.title = "Second".into();
662
663 let resp = svc
664 .bulk_update_context(BulkUpdateContextRequest {
665 session_id,
666 updates: vec![a, b],
667 })
668 .await
669 .unwrap();
670 assert_eq!(resp.entry_ids.len(), 2);
671 assert!(resp.durable);
672 std::fs::remove_dir_all(&test_dir).unwrap();
673 }
674
675 #[tokio::test]
676 async fn update_context_returns_fast_then_pipeline_drains_in_background() {
677 let (svc, test_dir) = make_service("nonblocking").await;
687 let session_id = svc.inner().create_session(None, None).await.unwrap();
688
689 let mut warmup = good_request();
691 warmup.session_id = session_id;
692 warmup.content.title = "warmup".into();
693 let _ = svc.update_context(warmup).await.unwrap();
694
695 for _ in 0..100 {
697 if svc.pipeline().backlog() == 0 {
698 break;
699 }
700 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
701 }
702
703 let mut req = good_request();
705 req.session_id = session_id;
706 let start = std::time::Instant::now();
707 let resp = svc.update_context(req).await.unwrap();
708 let write_latency = start.elapsed();
709
710 assert!(resp.durable);
711 assert!(
712 write_latency.as_millis() < 250,
713 "update_context took {write_latency:?} on warm path — should be <250ms"
714 );
715
716 for _ in 0..100 {
718 if svc.pipeline().backlog() == 0 {
719 break;
720 }
721 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
722 }
723 assert_eq!(
724 svc.pipeline().backlog(),
725 0,
726 "pipeline backlog should drain within 2s"
727 );
728
729 std::fs::remove_dir_all(&test_dir).unwrap();
730 }
731
732 #[tokio::test]
733 async fn bulk_update_context_rejects_mismatched_session() {
734 let (svc, test_dir) = make_service("bulkmis").await;
735 let session_id = svc.inner().create_session(None, None).await.unwrap();
736
737 let mut item = good_request();
738 item.session_id = Uuid::new_v4(); let err = svc
740 .bulk_update_context(BulkUpdateContextRequest {
741 session_id,
742 updates: vec![item],
743 })
744 .await
745 .unwrap_err();
746 assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("does not match")));
747 std::fs::remove_dir_all(&test_dir).unwrap();
748 }
749}