1use std::collections::HashSet;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use chrono::Utc;
26use post_cortex_core::core::context_update::{ContextUpdate, EntityRelationship, TypedEntity};
27use post_cortex_core::core::error::SystemError;
28use post_cortex_core::services::{
29 AdminRequest, AdminResponse, AssembleContextRequest, AssembleContextResponse,
30 BulkUpdateContextRequest, BulkUpdateContextResponse, HealthReport, ManageEntityRequest,
31 ManageEntityResponse, ManageSessionRequest, ManageSessionResponse, ManageWorkspaceRequest,
32 ManageWorkspaceResponse, PostCortexService, QueryContextRequest, QueryContextResponse,
33 SemanticSearchRequest, SemanticSearchResponse, StructuredSummaryRequest,
34 StructuredSummaryResponse, UpdateContextRequest, UpdateContextResponse,
35};
36use tracing::warn;
37use uuid::Uuid;
38
39use crate::memory_system::ConversationMemorySystem;
40use crate::pipeline::{
41 EmbeddingWorkItem, GraphWorkItem, Pipeline, PipelineConfig, PipelineError, SummaryWorkItem,
42};
43
44pub struct MemoryServiceImpl {
50 system: Arc<ConversationMemorySystem>,
51 pipeline: Arc<Pipeline>,
52}
53
54impl MemoryServiceImpl {
55 #[must_use]
59 pub fn new(system: Arc<ConversationMemorySystem>) -> Self {
60 Self::with_pipeline_config(system, PipelineConfig::default())
61 }
62
63 #[must_use]
66 pub fn with_pipeline_config(
67 system: Arc<ConversationMemorySystem>,
68 config: PipelineConfig,
69 ) -> Self {
70 let pipeline = Arc::new(Pipeline::start(config, Arc::clone(&system)));
71 Self { system, pipeline }
72 }
73
74 #[must_use]
78 pub fn pipeline(&self) -> &Arc<Pipeline> {
79 &self.pipeline
80 }
81
82 #[must_use]
86 pub fn inner(&self) -> &Arc<ConversationMemorySystem> {
87 &self.system
88 }
89
90 fn not_yet_wired<T>(op: &'static str) -> Result<T, SystemError> {
91 Err(SystemError::Internal(format!(
92 "PostCortexService::{op} is not yet wired — migration lands in Phase 6 (MCP) / Phase 7 (daemon). \
93 Use ConversationMemorySystem directly until then."
94 )))
95 }
96}
97
98#[async_trait]
99impl PostCortexService for MemoryServiceImpl {
100 #[tracing::instrument(skip(self), name = "post_cortex.health")]
101 async fn health(&self) -> Result<HealthReport, SystemError> {
102 let health = self.system.get_system_health();
103 Ok(HealthReport {
104 status: if health.circuit_breaker_open {
105 "degraded".to_string()
106 } else {
107 "ok".to_string()
108 },
109 active_sessions: health.active_sessions,
110 memory_usage_bytes: 0,
113 pipeline_backlog: self.pipeline.backlog(),
114 uptime_seconds: health.uptime_seconds,
115 })
116 }
117
118 #[tracing::instrument(
119 skip(self, req),
120 fields(
121 session_id = %req.session_id,
122 interaction_type = ?req.interaction_type,
123 entities = req.entities.len(),
124 relations = req.relations.len(),
125 ),
126 name = "post_cortex.update_context",
127 )]
128 async fn update_context(
129 &self,
130 req: UpdateContextRequest,
131 ) -> Result<UpdateContextResponse, SystemError> {
132 validate_update_request(&req)?;
133
134 let description = build_description(&req);
135 let context_update = build_context_update(&req);
136 let metadata =
137 serde_json::to_value(&context_update).expect("ContextUpdate serialization cannot fail");
138 let session_id = req.session_id;
139
140 let entry_id_str = self
141 .system
142 .add_incremental_update(session_id, description.clone(), Some(metadata))
143 .await
144 .map_err(SystemError::Internal)?;
145
146 let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
147 SystemError::Internal(format!(
148 "storage returned non-UUID entry id {entry_id_str:?}: {e}"
149 ))
150 })?;
151
152 submit_derived_work(
158 &self.pipeline,
159 session_id,
160 entry_id,
161 &description,
162 context_update,
163 );
164
165 Ok(UpdateContextResponse {
166 entry_id,
167 session_id,
168 persisted_at: Utc::now(),
169 durable: true,
170 })
171 }
172
173 #[tracing::instrument(
174 skip(self, req),
175 fields(session_id = %req.session_id, batch_size = req.updates.len()),
176 name = "post_cortex.bulk_update_context",
177 )]
178 async fn bulk_update_context(
179 &self,
180 req: BulkUpdateContextRequest,
181 ) -> Result<BulkUpdateContextResponse, SystemError> {
182 for (i, item) in req.updates.iter().enumerate() {
187 if item.session_id != req.session_id {
188 return Err(SystemError::InvalidArgument(format!(
189 "bulk_update_context: updates[{i}].session_id {} does not match request session_id {}",
190 item.session_id, req.session_id
191 )));
192 }
193 validate_update_request(item).map_err(|e| match e {
194 SystemError::InvalidArgument(msg) => {
195 SystemError::InvalidArgument(format!("updates[{i}]: {msg}"))
196 }
197 other => other,
198 })?;
199 }
200
201 let mut entry_ids = Vec::with_capacity(req.updates.len());
202 for (i, item) in req.updates.iter().enumerate() {
203 let description = build_description(item);
204 let context_update = build_context_update(item);
205 let metadata = serde_json::to_value(&context_update)
206 .expect("ContextUpdate serialization cannot fail");
207 let entry_id_str = self
208 .system
209 .add_incremental_update(item.session_id, description.clone(), Some(metadata))
210 .await
211 .map_err(|e| SystemError::Internal(format!("updates[{i}]: {e}")))?;
212 let entry_id = Uuid::parse_str(&entry_id_str).map_err(|e| {
213 SystemError::Internal(format!(
214 "updates[{i}]: storage returned non-UUID entry id {entry_id_str:?}: {e}"
215 ))
216 })?;
217 entry_ids.push(entry_id);
218 submit_derived_work(
219 &self.pipeline,
220 item.session_id,
221 entry_id,
222 &description,
223 context_update,
224 );
225 }
226
227 Ok(BulkUpdateContextResponse {
228 entry_ids,
229 persisted_at: Utc::now(),
230 durable: true,
231 })
232 }
233
234 #[tracing::instrument(skip(self, _req), name = "post_cortex.semantic_search")]
235 async fn semantic_search(
236 &self,
237 _req: SemanticSearchRequest,
238 ) -> Result<SemanticSearchResponse, SystemError> {
239 Self::not_yet_wired("semantic_search")
240 }
241
242 #[tracing::instrument(skip(self, _req), name = "post_cortex.query_context")]
243 async fn query_context(
244 &self,
245 _req: QueryContextRequest,
246 ) -> Result<QueryContextResponse, SystemError> {
247 Self::not_yet_wired("query_context")
248 }
249
250 #[tracing::instrument(skip(self, _req), name = "post_cortex.assemble_context")]
251 async fn assemble_context(
252 &self,
253 _req: AssembleContextRequest,
254 ) -> Result<AssembleContextResponse, SystemError> {
255 Self::not_yet_wired("assemble_context")
256 }
257
258 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_session")]
259 async fn manage_session(
260 &self,
261 _req: ManageSessionRequest,
262 ) -> Result<ManageSessionResponse, SystemError> {
263 Self::not_yet_wired("manage_session")
264 }
265
266 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_workspace")]
267 async fn manage_workspace(
268 &self,
269 _req: ManageWorkspaceRequest,
270 ) -> Result<ManageWorkspaceResponse, SystemError> {
271 Self::not_yet_wired("manage_workspace")
272 }
273
274 #[tracing::instrument(skip(self, _req), name = "post_cortex.manage_entity")]
275 async fn manage_entity(
276 &self,
277 _req: ManageEntityRequest,
278 ) -> Result<ManageEntityResponse, SystemError> {
279 Self::not_yet_wired("manage_entity")
280 }
281
282 #[tracing::instrument(skip(self, _req), name = "post_cortex.get_structured_summary")]
283 async fn get_structured_summary(
284 &self,
285 _req: StructuredSummaryRequest,
286 ) -> Result<StructuredSummaryResponse, SystemError> {
287 Self::not_yet_wired("get_structured_summary")
288 }
289
290 #[tracing::instrument(skip(self, _req), name = "post_cortex.admin")]
291 async fn admin(&self, _req: AdminRequest) -> Result<AdminResponse, SystemError> {
292 Self::not_yet_wired("admin")
293 }
294}
295
296fn validate_update_request(req: &UpdateContextRequest) -> Result<(), SystemError> {
308 if req.content.title.trim().is_empty() && req.content.description.trim().is_empty() {
309 return Err(SystemError::InvalidArgument(
310 "update_context: title and description are both empty — provide at least one".into(),
311 ));
312 }
313 if req.entities.is_empty() {
314 return Err(SystemError::InvalidArgument(
315 "update_context: entities must not be empty".into(),
316 ));
317 }
318 if req.relations.is_empty() {
319 return Err(SystemError::InvalidArgument(
320 "update_context: relations must not be empty".into(),
321 ));
322 }
323
324 let entity_names: HashSet<&str> = req.entities.iter().map(|e| e.name.as_str()).collect();
325 for (i, rel) in req.relations.iter().enumerate() {
326 if rel.from_entity.is_empty() {
327 return Err(SystemError::InvalidArgument(format!(
328 "relation[{i}]: from_entity must not be empty"
329 )));
330 }
331 if rel.to_entity.is_empty() {
332 return Err(SystemError::InvalidArgument(format!(
333 "relation[{i}]: to_entity must not be empty"
334 )));
335 }
336 if rel.from_entity == rel.to_entity {
337 return Err(SystemError::InvalidArgument(format!(
338 "relation[{i}]: self-relations are not allowed (from_entity == to_entity == {:?})",
339 rel.from_entity
340 )));
341 }
342 if rel.context.trim().is_empty() {
343 return Err(SystemError::InvalidArgument(format!(
344 "relation[{i}]: context must not be empty — every relation requires an explanation"
345 )));
346 }
347 if !entity_names.contains(rel.from_entity.as_str()) {
348 return Err(SystemError::InvalidArgument(format!(
349 "relation[{i}]: from_entity {:?} is not declared in the entities list",
350 rel.from_entity
351 )));
352 }
353 if !entity_names.contains(rel.to_entity.as_str()) {
354 return Err(SystemError::InvalidArgument(format!(
355 "relation[{i}]: to_entity {:?} is not declared in the entities list",
356 rel.to_entity
357 )));
358 }
359 }
360
361 Ok(())
362}
363
364fn build_description(req: &UpdateContextRequest) -> String {
367 if req.content.description.is_empty() {
368 req.content.title.clone()
369 } else if req.content.title.is_empty() {
370 req.content.description.clone()
371 } else {
372 format!("{}\n{}", req.content.title, req.content.description)
373 }
374}
375
376fn build_context_update(req: &UpdateContextRequest) -> ContextUpdate {
380 let typed_entities: Vec<TypedEntity> = req
381 .entities
382 .iter()
383 .map(|e| TypedEntity {
384 name: e.name.clone(),
385 entity_type: e.entity_type.clone(),
386 })
387 .collect();
388 let creates_entities: Vec<String> = req.entities.iter().map(|e| e.name.clone()).collect();
389 let creates_relationships: Vec<EntityRelationship> = req.relations.clone();
390 let related_code = req.code_reference.clone();
391
392 ContextUpdate {
393 id: Uuid::new_v4(),
394 timestamp: Utc::now(),
395 update_type: req.interaction_type.clone(),
396 content: req.content.clone(),
397 related_code,
398 parent_update: None,
399 user_marked_important: false,
400 creates_entities,
401 creates_relationships,
402 references_entities: Vec::new(),
403 typed_entities,
404 }
405}
406
407fn submit_derived_work(
416 pipeline: &Pipeline,
417 session_id: Uuid,
418 entry_id: Uuid,
419 text: &str,
420 update: ContextUpdate,
421) {
422 if let Err(e) = pipeline.submit_embedding(EmbeddingWorkItem {
423 session_id,
424 entry_id,
425 text: text.to_string(),
426 }) {
427 log_pipeline_submit("embedding", session_id, entry_id, e);
428 }
429 if let Err(e) = pipeline.submit_graph(GraphWorkItem::ApplyUpdate { session_id, update }) {
430 log_pipeline_submit("graph", session_id, entry_id, e);
431 }
432 if let Err(e) = pipeline.submit_summary(SummaryWorkItem { session_id }) {
433 log_pipeline_submit("summary", session_id, entry_id, e);
434 }
435}
436
437fn log_pipeline_submit(queue: &str, session_id: Uuid, entry_id: Uuid, err: PipelineError) {
438 warn!(
439 queue,
440 %session_id,
441 %entry_id,
442 error = %err,
443 "pipeline submission failed (non-fatal — legacy in-system spawn covers the work)"
444 );
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::memory_system::SystemConfig;
451 use chrono::Utc;
452 use post_cortex_core::core::context_update::{
453 EntityData, EntityType, RelationType, UpdateContent, UpdateType,
454 };
455
456 fn entity(name: &str) -> EntityData {
457 EntityData {
458 name: name.to_string(),
459 entity_type: EntityType::Concept,
460 first_mentioned: Utc::now(),
461 last_mentioned: Utc::now(),
462 mention_count: 1,
463 importance_score: 1.0,
464 description: None,
465 }
466 }
467
468 fn relation(from: &str, to: &str) -> EntityRelationship {
469 EntityRelationship {
470 from_entity: from.to_string(),
471 to_entity: to.to_string(),
472 relation_type: RelationType::RelatedTo,
473 context: "test relation".to_string(),
474 }
475 }
476
477 fn good_request() -> UpdateContextRequest {
478 UpdateContextRequest {
479 session_id: Uuid::new_v4(),
480 interaction_type: UpdateType::ConceptDefined,
481 content: UpdateContent {
482 title: "Some concept".into(),
483 description: "A short definition".into(),
484 details: vec![],
485 examples: vec![],
486 implications: vec![],
487 },
488 entities: vec![entity("Foo"), entity("Bar")],
489 relations: vec![relation("Foo", "Bar")],
490 code_reference: None,
491 }
492 }
493
494 #[tokio::test]
495 async fn trait_is_object_safe() {
496 fn _accept_dyn(_svc: Arc<dyn PostCortexService>) {}
497 }
498
499 #[test]
500 fn validation_rejects_empty_title_and_description() {
501 let mut req = good_request();
502 req.content.title = String::new();
503 req.content.description = String::new();
504 let err = validate_update_request(&req).unwrap_err();
505 assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("both empty")));
506 }
507
508 #[test]
509 fn validation_rejects_empty_entities() {
510 let mut req = good_request();
511 req.entities = vec![];
512 assert!(matches!(
513 validate_update_request(&req),
514 Err(SystemError::InvalidArgument(_))
515 ));
516 }
517
518 #[test]
519 fn validation_rejects_empty_relations() {
520 let mut req = good_request();
521 req.relations = vec![];
522 assert!(matches!(
523 validate_update_request(&req),
524 Err(SystemError::InvalidArgument(_))
525 ));
526 }
527
528 #[test]
529 fn validation_rejects_self_relation() {
530 let mut req = good_request();
531 req.relations = vec![relation("Foo", "Foo")];
532 assert!(matches!(
533 validate_update_request(&req),
534 Err(SystemError::InvalidArgument(ref m)) if m.contains("self-relations")
535 ));
536 }
537
538 #[test]
539 fn validation_rejects_dangling_relation_endpoint() {
540 let mut req = good_request();
541 req.relations = vec![relation("Foo", "Ghost")];
542 assert!(matches!(
543 validate_update_request(&req),
544 Err(SystemError::InvalidArgument(ref m)) if m.contains("Ghost")
545 ));
546 }
547
548 #[test]
549 fn validation_rejects_empty_relation_context() {
550 let mut req = good_request();
551 req.relations[0].context = " ".into();
552 assert!(matches!(
553 validate_update_request(&req),
554 Err(SystemError::InvalidArgument(ref m)) if m.contains("context must not be empty")
555 ));
556 }
557
558 #[test]
559 fn validation_accepts_good_request() {
560 assert!(validate_update_request(&good_request()).is_ok());
561 }
562
563 #[test]
564 fn description_joins_title_and_body() {
565 let req = good_request();
566 assert_eq!(build_description(&req), "Some concept\nA short definition");
567 }
568
569 #[test]
570 fn description_falls_back_when_one_side_empty() {
571 let mut req = good_request();
572 req.content.description = String::new();
573 assert_eq!(build_description(&req), "Some concept");
574
575 req.content.title = String::new();
576 req.content.description = "Body only".into();
577 assert_eq!(build_description(&req), "Body only");
578 }
579
580 #[test]
581 fn metadata_keeps_creates_entities_in_sync_with_typed_entities() {
582 let req = good_request();
583 let update = build_context_update(&req);
584 let meta = serde_json::to_value(&update).unwrap();
585 let names = meta["creates_entities"].as_array().unwrap();
586 let typed = meta["typed_entities"].as_array().unwrap();
587 assert_eq!(names.len(), typed.len());
588 assert_eq!(names.len(), req.entities.len());
589 for (i, e) in req.entities.iter().enumerate() {
590 assert_eq!(names[i].as_str().unwrap(), e.name);
591 assert_eq!(typed[i]["name"].as_str().unwrap(), e.name);
592 }
593 }
594
595 async fn make_service(suffix: &str) -> (MemoryServiceImpl, String) {
596 let test_dir = format!(
597 "./test_data_memservice_{}_{}",
598 suffix,
599 std::time::SystemTime::now()
600 .duration_since(std::time::UNIX_EPOCH)
601 .unwrap()
602 .as_nanos()
603 );
604 std::fs::create_dir_all(&test_dir).unwrap();
605 let config = SystemConfig {
606 data_directory: test_dir.clone(),
607 ..Default::default()
608 };
609 let system = Arc::new(ConversationMemorySystem::new(config).await.unwrap());
610 (MemoryServiceImpl::new(system), test_dir)
611 }
612
613 #[tokio::test]
614 async fn health_returns_ok_status() {
615 let (svc, test_dir) = make_service("health").await;
616 let report = svc.health().await.unwrap();
617 assert!(report.status == "ok" || report.status == "degraded");
618 std::fs::remove_dir_all(&test_dir).unwrap();
619 }
620
621 #[tokio::test]
622 async fn update_context_persists_and_returns_entry_id() {
623 let (svc, test_dir) = make_service("update").await;
624 let session_id = svc.inner().create_session(None, None).await.unwrap();
625
626 let mut req = good_request();
627 req.session_id = session_id;
628 let resp = svc.update_context(req).await.unwrap();
629
630 assert_eq!(resp.session_id, session_id);
631 assert!(resp.durable);
632 std::fs::remove_dir_all(&test_dir).unwrap();
633 }
634
635 #[tokio::test]
636 async fn update_context_rejects_invalid_input_with_invalid_argument() {
637 let (svc, test_dir) = make_service("invalid").await;
638 let session_id = svc.inner().create_session(None, None).await.unwrap();
639
640 let mut req = good_request();
641 req.session_id = session_id;
642 req.entities = vec![]; let err = svc.update_context(req).await.unwrap_err();
644 assert!(matches!(err, SystemError::InvalidArgument(_)));
645 std::fs::remove_dir_all(&test_dir).unwrap();
646 }
647
648 #[tokio::test]
649 async fn bulk_update_context_persists_every_item() {
650 let (svc, test_dir) = make_service("bulk").await;
651 let session_id = svc.inner().create_session(None, None).await.unwrap();
652
653 let mut a = good_request();
654 a.session_id = session_id;
655 a.content.title = "First".into();
656 let mut b = good_request();
657 b.session_id = session_id;
658 b.content.title = "Second".into();
659
660 let resp = svc
661 .bulk_update_context(BulkUpdateContextRequest {
662 session_id,
663 updates: vec![a, b],
664 })
665 .await
666 .unwrap();
667 assert_eq!(resp.entry_ids.len(), 2);
668 assert!(resp.durable);
669 std::fs::remove_dir_all(&test_dir).unwrap();
670 }
671
672 #[tokio::test]
673 async fn update_context_returns_fast_then_pipeline_drains_in_background() {
674 let (svc, test_dir) = make_service("nonblocking").await;
684 let session_id = svc.inner().create_session(None, None).await.unwrap();
685
686 let mut warmup = good_request();
688 warmup.session_id = session_id;
689 warmup.content.title = "warmup".into();
690 let _ = svc.update_context(warmup).await.unwrap();
691
692 for _ in 0..100 {
694 if svc.pipeline().backlog() == 0 {
695 break;
696 }
697 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
698 }
699
700 let mut req = good_request();
702 req.session_id = session_id;
703 let start = std::time::Instant::now();
704 let resp = svc.update_context(req).await.unwrap();
705 let write_latency = start.elapsed();
706
707 assert!(resp.durable);
708 assert!(
709 write_latency.as_millis() < 250,
710 "update_context took {write_latency:?} on warm path — should be <250ms"
711 );
712
713 for _ in 0..100 {
715 if svc.pipeline().backlog() == 0 {
716 break;
717 }
718 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
719 }
720 assert_eq!(
721 svc.pipeline().backlog(),
722 0,
723 "pipeline backlog should drain within 2s"
724 );
725
726 std::fs::remove_dir_all(&test_dir).unwrap();
727 }
728
729 #[tokio::test]
730 async fn bulk_update_context_rejects_mismatched_session() {
731 let (svc, test_dir) = make_service("bulkmis").await;
732 let session_id = svc.inner().create_session(None, None).await.unwrap();
733
734 let mut item = good_request();
735 item.session_id = Uuid::new_v4(); let err = svc
737 .bulk_update_context(BulkUpdateContextRequest {
738 session_id,
739 updates: vec![item],
740 })
741 .await
742 .unwrap_err();
743 assert!(matches!(err, SystemError::InvalidArgument(ref m) if m.contains("does not match")));
744 std::fs::remove_dir_all(&test_dir).unwrap();
745 }
746}