1use post_cortex_core::services::PostCortexService;
16use post_cortex_memory::services::MemoryServiceImpl;
17use post_cortex_memory::ConversationMemorySystem;
18use post_cortex_storage::rocksdb_storage::SessionCheckpoint;
19use std::sync::Arc;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info, warn};
22use uuid::Uuid;
23
24pub use post_cortex_proto::pb;
30
31mod freshness;
32mod helpers;
33
34use helpers::{
35 parse_session_role, parse_uuid, proto_bulk_item_to_request, proto_update_to_request,
36 system_error_to_status, workspace_to_info,
37};
38use pb::post_cortex_server::{PostCortex, PostCortexServer};
39use pb::*;
40
41pub struct PcxGrpcService {
50 pub(super) memory: Arc<ConversationMemorySystem>,
51 pub(super) service: Arc<MemoryServiceImpl>,
52}
53
54impl PcxGrpcService {
55 pub fn new(memory: Arc<ConversationMemorySystem>) -> Self {
57 let service = Arc::new(MemoryServiceImpl::new(memory.clone()));
58 Self { memory, service }
59 }
60
61 pub fn from_service(service: Arc<MemoryServiceImpl>) -> Self {
64 let memory = service.inner().clone();
65 Self { memory, service }
66 }
67
68 pub fn into_server(self) -> PostCortexServer<Self> {
70 PostCortexServer::new(self)
71 }
72}
73
74#[tonic::async_trait]
75impl PostCortex for PcxGrpcService {
76 async fn health(
77 &self,
78 _request: Request<HealthRequest>,
79 ) -> Result<Response<HealthResponse>, Status> {
80 let health = self.memory.get_system_health();
81
82 Ok(Response::new(HealthResponse {
83 healthy: !health.circuit_breaker_open,
84 version: env!("CARGO_PKG_VERSION").to_string(),
85 active_sessions: health.active_sessions as u64,
86 total_updates: health.total_requests,
87 embeddings_enabled: self.memory.embeddings_enabled(),
88 }))
89 }
90
91 async fn create_session(
92 &self,
93 request: Request<CreateSessionRequest>,
94 ) -> Result<Response<CreateSessionResponse>, Status> {
95 let req = request.into_inner();
96 debug!("gRPC CreateSession: name={}", req.name);
97
98 let name = if req.name.is_empty() {
99 None
100 } else {
101 Some(req.name)
102 };
103 let description = if req.description.is_empty() {
104 None
105 } else {
106 Some(req.description)
107 };
108
109 match self.memory.create_session(name, description).await {
110 Ok(session_id) => Ok(Response::new(CreateSessionResponse {
111 session_id: session_id.to_string(),
112 })),
113 Err(e) => {
114 error!("gRPC CreateSession failed: {}", e);
115 Err(Status::internal(e))
116 }
117 }
118 }
119
120 async fn list_sessions(
121 &self,
122 request: Request<ListSessionsRequest>,
123 ) -> Result<Response<ListSessionsResponse>, Status> {
124 let req = request.into_inner();
125
126 let session_ids = if !req.name_filter.is_empty() {
127 self.memory
128 .find_sessions_by_name_or_description(&req.name_filter)
129 .await
130 .map_err(Status::internal)?
131 } else {
132 self.memory
133 .list_sessions()
134 .await
135 .map_err(Status::internal)?
136 };
137
138 let limit = if req.limit > 0 {
139 req.limit as usize
140 } else {
141 100
142 };
143
144 let mut sessions = Vec::new();
145 for session_id in session_ids.into_iter().take(limit) {
146 if let Ok(session_arc) = self.memory.get_session(session_id).await {
147 let session = session_arc.load();
148 sessions.push(SessionInfo {
149 session_id: session_id.to_string(),
150 name: session.name().unwrap_or_default(),
151 description: session.description().unwrap_or_default(),
152 created_at_unix: session.created_at().timestamp(),
153 update_count: session.incremental_updates.len() as u32,
154 });
155 }
156 }
157
158 Ok(Response::new(ListSessionsResponse { sessions }))
159 }
160
161 async fn load_session(
162 &self,
163 request: Request<LoadSessionRequest>,
164 ) -> Result<Response<LoadSessionResponse>, Status> {
165 let req = request.into_inner();
166 debug!("gRPC LoadSession: session_id={}", req.session_id);
167
168 let session_id = parse_uuid(&req.session_id)?;
169
170 let session_arc = self
171 .memory
172 .get_session(session_id)
173 .await
174 .map_err(Status::not_found)?;
175 let session = session_arc.load();
176
177 let info = SessionInfo {
178 session_id: session_id.to_string(),
179 name: session.name().unwrap_or_default(),
180 description: session.description().unwrap_or_default(),
181 created_at_unix: session.created_at().timestamp(),
182 update_count: session.incremental_updates.len() as u32,
183 };
184
185 Ok(Response::new(LoadSessionResponse {
186 session: Some(info),
187 }))
188 }
189
190 async fn search_sessions(
191 &self,
192 request: Request<SearchSessionsRequest>,
193 ) -> Result<Response<SearchSessionsResponse>, Status> {
194 let req = request.into_inner();
195 debug!("gRPC SearchSessions: query={}", req.query);
196
197 if req.query.is_empty() {
198 return Err(Status::invalid_argument("query cannot be empty"));
199 }
200
201 let session_ids = self
202 .memory
203 .find_sessions_by_name_or_description(&req.query)
204 .await
205 .map_err(Status::internal)?;
206
207 let mut sessions = Vec::new();
208 for session_id in session_ids {
209 if let Ok(session_arc) = self.memory.get_session(session_id).await {
210 let session = session_arc.load();
211 sessions.push(SessionInfo {
212 session_id: session_id.to_string(),
213 name: session.name().unwrap_or_default(),
214 description: session.description().unwrap_or_default(),
215 created_at_unix: session.created_at().timestamp(),
216 update_count: session.incremental_updates.len() as u32,
217 });
218 }
219 }
220
221 Ok(Response::new(SearchSessionsResponse { sessions }))
222 }
223
224 async fn update_session_metadata(
225 &self,
226 request: Request<UpdateSessionMetadataRequest>,
227 ) -> Result<Response<UpdateSessionMetadataResponse>, Status> {
228 let req = request.into_inner();
229 debug!("gRPC UpdateSessionMetadata: session_id={}", req.session_id);
230
231 let session_id = parse_uuid(&req.session_id)?;
232
233 match self
234 .memory
235 .update_session_metadata(session_id, req.name, req.description)
236 .await
237 {
238 Ok(()) => Ok(Response::new(UpdateSessionMetadataResponse {
239 success: true,
240 })),
241 Err(e) => {
242 error!("gRPC UpdateSessionMetadata failed: {}", e);
243 Err(Status::internal(e))
244 }
245 }
246 }
247
248 async fn delete_session(
249 &self,
250 request: Request<DeleteSessionRequest>,
251 ) -> Result<Response<DeleteSessionResponse>, Status> {
252 let req = request.into_inner();
253 debug!("gRPC DeleteSession: session_id={}", req.session_id);
254
255 let session_id = parse_uuid(&req.session_id)?;
256
257 match self.memory.delete_session(session_id).await {
258 Ok(success) => Ok(Response::new(DeleteSessionResponse { success })),
259 Err(e) => {
260 error!("gRPC DeleteSession failed: {}", e);
261 Err(Status::internal(e))
262 }
263 }
264 }
265
266 async fn delete_entity(
267 &self,
268 request: Request<DeleteEntityRequest>,
269 ) -> Result<Response<DeleteEntityResponse>, Status> {
270 let req = request.into_inner();
271 debug!(
272 "gRPC DeleteEntity: session_id={} entity={}",
273 req.session_id, req.entity_name
274 );
275
276 if req.entity_name.is_empty() {
277 return Err(Status::invalid_argument("entity_name must not be empty"));
278 }
279 let session_id = parse_uuid(&req.session_id)?;
280
281 match self.memory.delete_entity(session_id, &req.entity_name).await {
282 Ok(existed) => Ok(Response::new(DeleteEntityResponse { existed })),
283 Err(e) => {
284 error!("gRPC DeleteEntity failed: {}", e);
285 Err(Status::internal(e))
286 }
287 }
288 }
289
290 async fn update_context(
291 &self,
292 request: Request<UpdateContextRequest>,
293 ) -> Result<Response<UpdateContextResponse>, Status> {
294 let req = request.into_inner();
295 debug!(
296 "gRPC UpdateContext: session={}, type={}",
297 req.session_id, req.interaction_type
298 );
299
300 let service_req = proto_update_to_request(req)?;
301 let resp = self
302 .service
303 .update_context(service_req)
304 .await
305 .map_err(|e| {
306 error!("gRPC UpdateContext failed: {}", e);
307 system_error_to_status(e)
308 })?;
309
310 Ok(Response::new(UpdateContextResponse {
311 update_id: resp.entry_id.to_string(),
312 success: resp.durable,
313 }))
314 }
315
316 async fn bulk_update_context(
317 &self,
318 request: Request<BulkUpdateContextRequest>,
319 ) -> Result<Response<BulkUpdateContextResponse>, Status> {
320 let req = request.into_inner();
321 debug!(
322 "gRPC BulkUpdateContext: session={}, count={}",
323 req.session_id,
324 req.updates.len()
325 );
326
327 let batch_session_id = parse_uuid(&req.session_id)?;
330 let mut update_ids = Vec::new();
331 let mut success_count = 0u32;
332 let mut failure_count = 0u32;
333
334 for item in req.updates {
335 let service_req = match proto_bulk_item_to_request(batch_session_id, item) {
336 Ok(r) => r,
337 Err(status) => {
338 warn!(
339 "gRPC BulkUpdateContext item translation failed: {}",
340 status.message()
341 );
342 failure_count += 1;
343 continue;
344 }
345 };
346
347 match self.service.update_context(service_req).await {
348 Ok(resp) => {
349 update_ids.push(resp.entry_id.to_string());
350 success_count += 1;
351 }
352 Err(e) => {
353 error!("gRPC BulkUpdateContext item failed: {}", e);
354 failure_count += 1;
355 }
356 }
357 }
358
359 Ok(Response::new(BulkUpdateContextResponse {
360 update_ids,
361 success_count,
362 failure_count,
363 }))
364 }
365
366 async fn create_checkpoint(
367 &self,
368 request: Request<CreateCheckpointRequest>,
369 ) -> Result<Response<CreateCheckpointResponse>, Status> {
370 let req = request.into_inner();
371 debug!("gRPC CreateCheckpoint: session_id={}", req.session_id);
372
373 let session_id = parse_uuid(&req.session_id)?;
374
375 let session_arc = self
376 .memory
377 .get_session(session_id)
378 .await
379 .map_err(Status::not_found)?;
380 let session = session_arc.load();
381
382 let checkpoint = SessionCheckpoint {
383 id: Uuid::new_v4(),
384 session_id,
385 created_at: chrono::Utc::now(),
386 structured_context: (*session.current_state).clone(),
387 recent_updates: (*session.incremental_updates).clone(),
388 code_references: (*session.code_references).clone(),
389 change_history: (*session.change_history).clone(),
390 total_updates: session.incremental_updates.len(),
391 context_quality_score: 1.0,
392 compression_ratio: 1.0,
393 };
394 let checkpoint_id = checkpoint.id.to_string();
395
396 match self.memory.storage_actor.save_checkpoint(&checkpoint).await {
397 Ok(()) => Ok(Response::new(CreateCheckpointResponse {
398 checkpoint_id,
399 success: true,
400 })),
401 Err(e) => {
402 error!("gRPC CreateCheckpoint failed: {}", e);
403 Err(Status::internal(e))
404 }
405 }
406 }
407
408 async fn query_context(
409 &self,
410 request: Request<QueryContextRequest>,
411 ) -> Result<Response<QueryContextResponse>, Status> {
412 let req = request.into_inner();
413 let session_id = parse_uuid(&req.session_id)?;
414
415 let session_arc = self
416 .memory
417 .get_session(session_id)
418 .await
419 .map_err(Status::not_found)?;
420 let session = session_arc.load();
421
422 let limit = if req.limit > 0 {
423 req.limit as usize
424 } else {
425 50
426 };
427
428 let updates: Vec<ContextUpdateEntry> = session
429 .incremental_updates
430 .iter()
431 .filter(|u| {
432 if !req.interaction_type.is_empty() {
433 let ut = format!("{:?}", u.update_type);
434 ut.to_lowercase()
435 .contains(&req.interaction_type.to_lowercase())
436 } else {
437 true
438 }
439 })
440 .filter(|u| {
441 if req.after_unix > 0 {
442 u.timestamp.timestamp() > req.after_unix
443 } else {
444 true
445 }
446 })
447 .take(limit)
448 .map(|u| ContextUpdateEntry {
449 id: u.id.to_string(),
450 interaction_type: format!("{:?}", u.update_type),
451 content: Some(ContextContent {
452 title: u.content.title.clone(),
453 description: u.content.description.clone(),
454 details: u.content.details.clone(),
455 examples: u.content.examples.clone(),
456 implications: u.content.implications.clone(),
457 code_ref: u.related_code.as_ref().map(|c| CodeReference {
458 file_path: c.file_path.clone(),
459 start_line: c.start_line,
460 end_line: c.end_line,
461 code_snippet: c.code_snippet.clone(),
462 commit_hash: c.commit_hash.clone().unwrap_or_default(),
463 branch: c.branch.clone().unwrap_or_default(),
464 change_description: c.change_description.clone(),
465 }),
466 }),
467 timestamp_unix: u.timestamp.timestamp(),
468 entities: u.creates_entities.clone(),
469 source_ref: None, })
471 .collect();
472
473 let total = updates.len() as u32;
474 Ok(Response::new(QueryContextResponse { updates, total }))
475 }
476
477 async fn semantic_search(
478 &self,
479 request: Request<SemanticSearchRequest>,
480 ) -> Result<Response<SemanticSearchResponse>, Status> {
481 let req = request.into_inner();
482 debug!(
483 "gRPC SemanticSearch: query='{}', session='{}', workspace='{:?}'",
484 req.query, req.session_id, req.workspace_id
485 );
486
487 if req.query.is_empty() {
488 return Err(Status::invalid_argument("query cannot be empty"));
489 }
490
491 let max_results = if req.max_results > 0 {
492 req.max_results as usize
493 } else {
494 10
495 };
496
497 #[cfg(feature = "embeddings")]
498 {
499 let workspace_id_str = req.workspace_id.as_deref().unwrap_or("");
501 let search_results = if !workspace_id_str.is_empty() {
502 let workspace_id = parse_uuid(workspace_id_str)?;
504 let workspace = self
505 .memory
506 .workspace_manager
507 .get_workspace(&workspace_id)
508 .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
509
510 let session_ids: Vec<uuid::Uuid> = workspace
511 .get_all_sessions()
512 .into_iter()
513 .map(|(sid, _)| sid)
514 .collect();
515
516 let mut merged = Vec::new();
517 for sid in session_ids {
518 match self
519 .memory
520 .semantic_search_session(sid, &req.query, Some(max_results), None, None)
521 .await
522 {
523 Ok(results) => merged.extend(results),
524 Err(e) => warn!(
525 "SemanticSearch: session {} search failed: {}",
526 sid, e
527 ),
528 }
529 }
530 merged.sort_by(|a, b| b.combined_score.partial_cmp(&a.combined_score).unwrap_or(std::cmp::Ordering::Equal));
532 merged.truncate(max_results);
533 merged
534 } else if req.session_id.is_empty() {
535 self.memory
537 .semantic_search_global(&req.query, Some(max_results), None, None)
538 .await
539 .map_err(|e| Status::internal(format!("Search failed: {e}")))?
540 } else {
541 let session_id = parse_uuid(&req.session_id)?;
543 self.memory
544 .semantic_search_session(session_id, &req.query, Some(max_results), None, None)
545 .await
546 .map_err(|e| Status::internal(format!("Search failed: {e}")))?
547 };
548
549 let min_score = if req.min_score > 0.0 {
550 req.min_score
551 } else {
552 0.0
553 };
554
555 let results: Vec<SearchResult> = search_results
556 .into_iter()
557 .filter(|r| r.combined_score >= min_score)
558 .map(|r| SearchResult {
559 entry_id: r.content_id,
560 content: r.text_content,
561 score: r.combined_score,
562 session_id: r.session_id.to_string(),
563 content_type: format!("{:?}", r.content_type),
564 metadata: std::collections::HashMap::new(),
565 })
566 .collect();
567
568 let total_matches = results.len() as u32;
569 Ok(Response::new(SemanticSearchResponse {
570 results,
571 total_matches,
572 }))
573 }
574
575 #[cfg(not(feature = "embeddings"))]
576 {
577 Err(Status::unimplemented(
578 "Semantic search requires the 'embeddings' feature",
579 ))
580 }
581 }
582
583 async fn find_related_content(
584 &self,
585 request: Request<FindRelatedContentRequest>,
586 ) -> Result<Response<FindRelatedContentResponse>, Status> {
587 let req = request.into_inner();
588 debug!(
589 "gRPC FindRelatedContent: session={}, topic='{}'",
590 req.session_id, req.topic
591 );
592
593 if req.topic.is_empty() {
594 return Err(Status::invalid_argument("topic cannot be empty"));
595 }
596
597 #[cfg(feature = "embeddings")]
598 {
599 let session_id = parse_uuid(&req.session_id)?;
600 let max_results = req.limit.unwrap_or(10) as usize;
601
602 let search_results = self
603 .memory
604 .semantic_search_session(session_id, &req.topic, Some(max_results), None, None)
605 .await
606 .map_err(|e| Status::internal(format!("Search failed: {e}")))?;
607
608 let results: Vec<SearchResult> = search_results
609 .into_iter()
610 .map(|r| SearchResult {
611 entry_id: r.content_id,
612 content: r.text_content,
613 score: r.combined_score,
614 session_id: r.session_id.to_string(),
615 content_type: format!("{:?}", r.content_type),
616 metadata: std::collections::HashMap::new(),
617 })
618 .collect();
619
620 Ok(Response::new(FindRelatedContentResponse { results }))
621 }
622
623 #[cfg(not(feature = "embeddings"))]
624 {
625 Err(Status::unimplemented(
626 "FindRelatedContent requires the 'embeddings' feature",
627 ))
628 }
629 }
630
631 async fn vectorize_session(
632 &self,
633 request: Request<VectorizeSessionRequest>,
634 ) -> Result<Response<VectorizeSessionResponse>, Status> {
635 let req = request.into_inner();
636 debug!("gRPC VectorizeSession: session_id={}", req.session_id);
637
638 let session_id = parse_uuid(&req.session_id)?;
639
640 #[cfg(feature = "embeddings")]
641 {
642 match self.memory.vectorize_session(session_id).await {
643 Ok(vectors_created) => Ok(Response::new(VectorizeSessionResponse {
644 success: true,
645 vectors_created: vectors_created as u32,
646 })),
647 Err(e) => {
648 error!("gRPC VectorizeSession failed: {}", e);
649 Err(Status::internal(e))
650 }
651 }
652 }
653
654 #[cfg(not(feature = "embeddings"))]
655 {
656 Err(Status::unimplemented(
657 "VectorizeSession requires the 'embeddings' feature",
658 ))
659 }
660 }
661
662 async fn get_vectorization_stats(
663 &self,
664 _request: Request<GetVectorizationStatsRequest>,
665 ) -> Result<Response<TextResponse>, Status> {
666 debug!("gRPC GetVectorizationStats");
667
668 #[cfg(feature = "embeddings")]
669 {
670 match self.memory.get_vectorization_stats() {
671 Ok(stats) => {
672 let text =
673 serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "{}".to_string());
674 Ok(Response::new(TextResponse { text }))
675 }
676 Err(e) => {
677 error!("gRPC GetVectorizationStats failed: {}", e);
678 Err(Status::internal(e))
679 }
680 }
681 }
682
683 #[cfg(not(feature = "embeddings"))]
684 {
685 Err(Status::unimplemented(
686 "GetVectorizationStats requires the 'embeddings' feature",
687 ))
688 }
689 }
690
691 async fn get_structured_summary(
694 &self,
695 request: Request<GetStructuredSummaryRequest>,
696 ) -> Result<Response<TextResponse>, Status> {
697 let req = request.into_inner();
698 debug!("gRPC GetStructuredSummary: session_id={}", req.session_id);
699
700 let session_id = parse_uuid(&req.session_id)?;
701 let session_arc = self.memory.get_session(session_id).await
702 .map_err(Status::not_found)?;
703 let session = session_arc.load();
704
705 use post_cortex_core::summary::{SummaryGenerator, SummaryOptions};
706 let user_requested_compact = req.compact.unwrap_or(false);
707 const MAX_TOKENS: usize = 50_000;
708
709 let (_estimated_tokens, should_compact) =
710 SummaryGenerator::estimate_summary_size(&session, MAX_TOKENS);
711 let auto_compacted = !user_requested_compact && should_compact;
712
713 let options = if user_requested_compact || auto_compacted {
714 SummaryOptions::compact()
715 } else {
716 SummaryOptions {
717 decisions_limit: req.decisions_limit.map(|v| v as usize),
718 entities_limit: req.entities_limit.map(|v| v as usize),
719 questions_limit: req.questions_limit.map(|v| v as usize),
720 concepts_limit: req.concepts_limit.map(|v| v as usize),
721 min_confidence: req.min_confidence,
722 compact: false,
723 }
724 };
725
726 let summary = SummaryGenerator::generate_structured_summary_filtered(&session, &options);
727 let text = serde_json::to_string_pretty(&summary)
728 .unwrap_or_else(|_| format!("{summary:?}"));
729
730 Ok(Response::new(TextResponse { text }))
731 }
732
733 async fn get_key_decisions(
734 &self,
735 request: Request<GetKeyDecisionsRequest>,
736 ) -> Result<Response<TextResponse>, Status> {
737 let req = request.into_inner();
738 debug!("gRPC GetKeyDecisions: session_id={}", req.session_id);
739
740 let session_id = parse_uuid(&req.session_id)?;
741 let session_arc = self.memory.get_session(session_id).await
742 .map_err(Status::not_found)?;
743 let session = session_arc.load();
744
745 use post_cortex_core::summary::SummaryGenerator;
746 let decisions = SummaryGenerator::extract_decision_timeline(&session);
747 let text = serde_json::to_string_pretty(&decisions)
748 .unwrap_or_else(|_| format!("{decisions:?}"));
749
750 Ok(Response::new(TextResponse { text }))
751 }
752
753 async fn get_key_insights(
754 &self,
755 request: Request<GetKeyInsightsRequest>,
756 ) -> Result<Response<TextResponse>, Status> {
757 let req = request.into_inner();
758 debug!("gRPC GetKeyInsights: session_id={}", req.session_id);
759
760 let session_id = parse_uuid(&req.session_id)?;
761 let session_arc = self.memory.get_session(session_id).await
762 .map_err(Status::not_found)?;
763 let session = session_arc.load();
764
765 use post_cortex_core::summary::SummaryGenerator;
766 let insights = SummaryGenerator::extract_key_insights(&session, req.limit.map(|v| v as usize).unwrap_or(5));
767 let text = serde_json::to_string_pretty(&insights)
768 .unwrap_or_else(|_| format!("{insights:?}"));
769
770 Ok(Response::new(TextResponse { text }))
771 }
772
773 async fn get_entity_importance(
774 &self,
775 request: Request<GetEntityImportanceRequest>,
776 ) -> Result<Response<TextResponse>, Status> {
777 let req = request.into_inner();
778 debug!("gRPC GetEntityImportance: session_id={}", req.session_id);
779
780 let session_id = parse_uuid(&req.session_id)?;
781 let session_arc = self.memory.get_session(session_id).await
782 .map_err(Status::not_found)?;
783 let session = session_arc.load();
784
785 let mut analysis = session.entity_graph.analyze_entity_importance();
786 if let Some(min_imp) = req.min_importance {
787 analysis.retain(|a| a.importance_score >= min_imp);
788 }
789 if let Some(limit) = req.limit {
790 analysis.truncate(limit as usize);
791 }
792 let text = serde_json::to_string_pretty(&analysis)
793 .unwrap_or_else(|_| format!("{analysis:?}"));
794
795 Ok(Response::new(TextResponse { text }))
796 }
797
798 async fn get_entity_network(
799 &self,
800 request: Request<GetEntityNetworkRequest>,
801 ) -> Result<Response<TextResponse>, Status> {
802 let req = request.into_inner();
803 debug!("gRPC GetEntityNetwork: session_id={}", req.session_id);
804
805 let session_id = parse_uuid(&req.session_id)?;
806 let session_arc = self.memory.get_session(session_id).await
807 .map_err(Status::not_found)?;
808 let session = session_arc.load();
809
810 let network = match req.center_entity {
811 Some(entity) => session.entity_graph.get_entity_network(&entity, 2),
812 None => {
813 let top_entities = session.entity_graph.get_most_important_entities(1);
814 if let Some(top) = top_entities.first() {
815 session.entity_graph.get_entity_network(&top.name, 2)
816 } else {
817 session.entity_graph.get_entity_network("", 2)
818 }
819 }
820 };
821 let text = serde_json::to_string_pretty(&network)
822 .unwrap_or_else(|_| format!("{network:?}"));
823
824 Ok(Response::new(TextResponse { text }))
825 }
826
827 async fn get_session_statistics(
828 &self,
829 request: Request<GetSessionStatisticsRequest>,
830 ) -> Result<Response<TextResponse>, Status> {
831 let req = request.into_inner();
832 debug!("gRPC GetSessionStatistics: session_id={}", req.session_id);
833
834 let session_id = parse_uuid(&req.session_id)?;
835 let session_arc = self.memory.get_session(session_id).await
836 .map_err(Status::not_found)?;
837 let session = session_arc.load();
838
839 use post_cortex_core::summary::SummaryGenerator;
840 let stats = SummaryGenerator::calculate_session_stats(&session);
841 let text = serde_json::to_string_pretty(&stats)
842 .unwrap_or_else(|_| format!("{stats:?}"));
843
844 Ok(Response::new(TextResponse { text }))
845 }
846
847 async fn assemble_context(
850 &self,
851 request: Request<AssembleContextRequest>,
852 ) -> Result<Response<AssembleContextResponse>, Status> {
853 let req = request.into_inner();
854 debug!(
855 "gRPC AssembleContext: session_id={}, workspace_id={:?}, query={}",
856 req.session_id, req.workspace_id, req.query
857 );
858
859 let token_budget = if req.token_budget == 0 { 4000 } else { req.token_budget as usize };
860
861 use post_cortex_memory::context_assembly;
862 use post_cortex_core::graph::entity_graph::SimpleEntityGraph;
863
864 let (updates, graph) = if req.workspace_id.as_deref().map(|s| !s.is_empty()).unwrap_or(false) {
866 let workspace_id = parse_uuid(req.workspace_id.as_deref().unwrap_or(""))?;
868 let workspace = self
869 .memory
870 .workspace_manager
871 .get_workspace(&workspace_id)
872 .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
873
874 let mut all_updates = Vec::new();
875 let mut merged_graph = SimpleEntityGraph::new();
876
877 for (ws_session_id, _role) in workspace.get_all_sessions() {
878 match self.memory.get_session(ws_session_id).await {
879 Ok(session_arc) => {
880 let session = session_arc.load();
881 all_updates.extend(
882 session.hot_context.iter().iter().cloned(),
883 );
884 all_updates.extend(
885 session.warm_context.iter().map(|c| c.update.clone()),
886 );
887 merged_graph.merge_from(&session.entity_graph);
888 }
889 Err(e) => {
890 warn!(
891 "AssembleContext: could not load session {} from workspace {}: {}",
892 ws_session_id, workspace_id, e
893 );
894 }
895 }
896 }
897
898 (all_updates, merged_graph)
899 } else {
900 let session_id = parse_uuid(&req.session_id)?;
902 let session_arc = self.memory.get_session(session_id).await
903 .map_err(Status::not_found)?;
904 let session = session_arc.load();
905 let updates: Vec<_> = session.hot_context.iter().iter()
906 .chain(session.warm_context.iter().map(|c| &c.update))
907 .cloned()
908 .collect();
909 (updates, (*session.entity_graph).clone())
910 };
911
912 let assembled = context_assembly::assemble_context(
913 &req.query,
914 &graph,
915 &updates,
916 token_budget,
917 );
918
919 let formatted_text = context_assembly::format_for_llm(&assembled);
920
921 let items: Vec<AssembledContextItem> = assembled.items.iter().map(|item| {
923 let (source, via_entity) = match &item.source {
924 context_assembly::ContextSource::SemanticMatch => ("semantic_match".to_string(), String::new()),
925 context_assembly::ContextSource::GraphTraversal { via_entity } => ("graph_traversal".to_string(), via_entity.clone()),
926 context_assembly::ContextSource::RecentUpdate => ("recent_update".to_string(), String::new()),
927 };
928 AssembledContextItem {
929 text: item.text.clone(),
930 score: item.score,
931 source,
932 via_entity,
933 entities: item.entities.clone(),
934 token_estimate: item.token_estimate as u32,
935 entry_id: item.entry_id.clone(),
936 }
937 }).collect();
938
939 let entity_context: Vec<AssembledEntityContext> = assembled.entity_context.iter().map(|ec| {
940 let (relevance, via_entity, via_relation) = match &ec.relevance {
941 context_assembly::EntityRelevance::DirectMention => ("direct_mention".to_string(), String::new(), String::new()),
942 context_assembly::EntityRelevance::GraphNeighbor { via, relation } => ("graph_neighbor".to_string(), via.clone(), relation.clone()),
943 };
944 let relationships: Vec<EntityRelation> = ec.relationships.iter().map(|r| {
945 EntityRelation {
946 from_entity: r.from_entity.clone(),
947 to_entity: r.to_entity.clone(),
948 relation_type: format!("{:?}", r.relation_type),
949 context: r.context.clone(),
950 }
951 }).collect();
952 AssembledEntityContext {
953 name: ec.name.clone(),
954 relevance,
955 via_entity,
956 via_relation,
957 relationships,
958 }
959 }).collect();
960
961 let impact: Vec<pb::ImpactEntry> = assembled.impact.iter().map(|i| {
962 pb::ImpactEntry {
963 entity: i.entity.clone(),
964 depends_on: i.depends_on.clone(),
965 relation_type: format!("{:?}", i.relation_type),
966 context: i.context.clone(),
967 }
968 }).collect();
969
970 Ok(Response::new(AssembleContextResponse {
971 items,
972 entity_context,
973 impact,
974 total_tokens: assembled.total_tokens as u32,
975 formatted_text,
976 }))
977 }
978
979 async fn create_workspace(
982 &self,
983 request: Request<CreateWorkspaceRequest>,
984 ) -> Result<Response<CreateWorkspaceResponse>, Status> {
985 let req = request.into_inner();
986 debug!("gRPC CreateWorkspace: name={}", req.name);
987
988 let workspace_id = self
989 .memory
990 .workspace_manager
991 .create_workspace(req.name.clone(), req.description.clone());
992
993 if let Err(e) = self
995 .memory
996 .save_workspace_metadata(workspace_id, &req.name, &req.description, &[])
997 .await
998 {
999 warn!("gRPC CreateWorkspace: failed to persist metadata: {}", e);
1000 }
1001
1002 Ok(Response::new(CreateWorkspaceResponse {
1003 workspace_id: workspace_id.to_string(),
1004 }))
1005 }
1006
1007 async fn get_workspace(
1008 &self,
1009 request: Request<GetWorkspaceRequest>,
1010 ) -> Result<Response<GetWorkspaceResponse>, Status> {
1011 let req = request.into_inner();
1012 debug!("gRPC GetWorkspace: workspace_id={}", req.workspace_id);
1013
1014 let workspace_id = parse_uuid(&req.workspace_id)?;
1015
1016 let workspace = self
1017 .memory
1018 .workspace_manager
1019 .get_workspace(&workspace_id)
1020 .ok_or_else(|| Status::not_found(format!("Workspace {} not found", workspace_id)))?;
1021
1022 let info = workspace_to_info(&workspace);
1023
1024 Ok(Response::new(GetWorkspaceResponse {
1025 workspace: Some(info),
1026 }))
1027 }
1028
1029 async fn list_workspaces(
1030 &self,
1031 _request: Request<ListWorkspacesRequest>,
1032 ) -> Result<Response<ListWorkspacesResponse>, Status> {
1033 debug!("gRPC ListWorkspaces");
1034
1035 let workspaces = self.memory.workspace_manager.list_workspaces();
1036
1037 let infos: Vec<WorkspaceInfo> = workspaces.iter().map(|ws| workspace_to_info(ws)).collect();
1038
1039 Ok(Response::new(ListWorkspacesResponse { workspaces: infos }))
1040 }
1041
1042 async fn delete_workspace(
1043 &self,
1044 request: Request<DeleteWorkspaceRequest>,
1045 ) -> Result<Response<DeleteWorkspaceResponse>, Status> {
1046 let req = request.into_inner();
1047 debug!("gRPC DeleteWorkspace: workspace_id={}", req.workspace_id);
1048
1049 let workspace_id = parse_uuid(&req.workspace_id)?;
1050
1051 let deleted = self
1052 .memory
1053 .workspace_manager
1054 .delete_workspace(&workspace_id);
1055
1056 let success = deleted.is_some();
1057 Ok(Response::new(DeleteWorkspaceResponse { success }))
1058 }
1059
1060 async fn add_session_to_workspace(
1061 &self,
1062 request: Request<AddSessionToWorkspaceRequest>,
1063 ) -> Result<Response<AddSessionToWorkspaceResponse>, Status> {
1064 let req = request.into_inner();
1065 debug!(
1066 "gRPC AddSessionToWorkspace: workspace={}, session={}, role={}",
1067 req.workspace_id, req.session_id, req.role
1068 );
1069
1070 let workspace_id = parse_uuid(&req.workspace_id)?;
1071 let session_id = parse_uuid(&req.session_id)?;
1072 let role = parse_session_role(&req.role);
1073
1074 self.memory
1075 .workspace_manager
1076 .add_session_to_workspace(&workspace_id, session_id, role)
1077 .map_err(Status::not_found)?;
1078
1079 if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1081 let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1082 if let Err(e) = self
1083 .memory
1084 .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1085 .await
1086 {
1087 warn!(
1088 "gRPC AddSessionToWorkspace: failed to persist metadata: {}",
1089 e
1090 );
1091 }
1092 }
1093
1094 Ok(Response::new(AddSessionToWorkspaceResponse {
1095 success: true,
1096 }))
1097 }
1098
1099 async fn remove_session_from_workspace(
1100 &self,
1101 request: Request<RemoveSessionFromWorkspaceRequest>,
1102 ) -> Result<Response<RemoveSessionFromWorkspaceResponse>, Status> {
1103 let req = request.into_inner();
1104 debug!(
1105 "gRPC RemoveSessionFromWorkspace: workspace={}, session={}",
1106 req.workspace_id, req.session_id
1107 );
1108
1109 let workspace_id = parse_uuid(&req.workspace_id)?;
1110 let session_id = parse_uuid(&req.session_id)?;
1111
1112 self.memory
1113 .workspace_manager
1114 .remove_session_from_workspace(&workspace_id, &session_id)
1115 .map_err(Status::not_found)?;
1116
1117 if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1119 let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1120 if let Err(e) = self
1121 .memory
1122 .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1123 .await
1124 {
1125 warn!(
1126 "gRPC RemoveSessionFromWorkspace: failed to persist metadata: {}",
1127 e
1128 );
1129 }
1130 }
1131
1132 Ok(Response::new(RemoveSessionFromWorkspaceResponse {
1133 success: true,
1134 }))
1135 }
1136
1137
1138 async fn register_source(
1141 &self,
1142 request: Request<RegisterSourceRequest>,
1143 ) -> Result<Response<RegisterSourceAck>, Status> {
1144 self.register_source_impl(request).await
1145 }
1146
1147 async fn register_source_batch(
1148 &self,
1149 request: Request<RegisterSourceBatchRequest>,
1150 ) -> Result<Response<RegisterSourceBatchAck>, Status> {
1151 self.register_source_batch_impl(request).await
1152 }
1153
1154 async fn check_freshness(
1155 &self,
1156 request: Request<FreshnessRequest>,
1157 ) -> Result<Response<FreshnessReport>, Status> {
1158 self.check_freshness_impl(request).await
1159 }
1160
1161 async fn invalidate(
1162 &self,
1163 request: Request<InvalidateRequest>,
1164 ) -> Result<Response<InvalidateAck>, Status> {
1165 self.invalidate_impl(request).await
1166 }
1167
1168 async fn register_symbol_dependency(
1169 &self,
1170 request: Request<RegisterSymbolDependencyRequest>,
1171 ) -> Result<Response<RegisterSymbolDependencyAck>, Status> {
1172 self.register_symbol_dependency_impl(request).await
1173 }
1174
1175 async fn cascade_invalidate(
1176 &self,
1177 request: Request<CascadeInvalidateRequest>,
1178 ) -> Result<Response<CascadeInvalidateReport>, Status> {
1179 self.cascade_invalidate_impl(request).await
1180 }
1181
1182 async fn get_stale_entries_by_source(
1183 &self,
1184 request: Request<GetStaleEntriesBySourceRequest>,
1185 ) -> Result<Response<GetStaleEntriesBySourceResponse>, Status> {
1186 self.get_stale_entries_by_source_impl(request).await
1187 }
1188}
1189
1190pub async fn start_grpc_server(
1193 memory: Arc<ConversationMemorySystem>,
1194 addr: std::net::SocketAddr,
1195) -> Result<(), String> {
1196 let service = PcxGrpcService::new(memory);
1197
1198 info!("Starting gRPC server on {}", addr);
1199
1200 tonic::transport::Server::builder()
1201 .add_service(service.into_server())
1202 .serve(addr)
1203 .await
1204 .map_err(|e| format!("gRPC server error: {e}"))
1205}