1use post_cortex_core::services::PostCortexService;
16use post_cortex_memory::ConversationMemorySystem;
17use post_cortex_memory::services::MemoryServiceImpl;
18use post_cortex_storage::rocksdb_storage::SessionCheckpoint;
19use std::sync::Arc;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info, warn};
22use uuid::Uuid;
23
24pub use post_cortex_proto::pb;
30
31mod freshness;
32mod helpers;
33
34use helpers::{
35 parse_session_role, parse_uuid, proto_bulk_item_to_request, proto_update_to_request,
36 system_error_to_status, workspace_to_info,
37};
38use pb::post_cortex_server::{PostCortex, PostCortexServer};
39use pb::*;
40
41pub struct PcxGrpcService {
50 pub(super) memory: Arc<ConversationMemorySystem>,
51 pub(super) service: Arc<MemoryServiceImpl>,
52}
53
54impl PcxGrpcService {
55 pub fn new(memory: Arc<ConversationMemorySystem>) -> Self {
57 let service = Arc::new(MemoryServiceImpl::new(memory.clone()));
58 Self { memory, service }
59 }
60
61 pub fn from_service(service: Arc<MemoryServiceImpl>) -> Self {
64 let memory = service.inner().clone();
65 Self { memory, service }
66 }
67
68 pub fn into_server(self) -> PostCortexServer<Self> {
70 PostCortexServer::new(self)
71 }
72}
73
74#[tonic::async_trait]
75impl PostCortex for PcxGrpcService {
76 async fn health(
77 &self,
78 _request: Request<HealthRequest>,
79 ) -> Result<Response<HealthResponse>, Status> {
80 let health = self.memory.get_system_health();
81
82 Ok(Response::new(HealthResponse {
83 healthy: !health.circuit_breaker_open,
84 version: env!("CARGO_PKG_VERSION").to_string(),
85 active_sessions: health.active_sessions as u64,
86 total_updates: health.total_requests,
87 embeddings_enabled: self.memory.embeddings_enabled(),
88 }))
89 }
90
91 async fn create_session(
92 &self,
93 request: Request<CreateSessionRequest>,
94 ) -> Result<Response<CreateSessionResponse>, Status> {
95 let req = request.into_inner();
96 debug!("gRPC CreateSession: name={}", req.name);
97
98 let name = if req.name.is_empty() {
99 None
100 } else {
101 Some(req.name)
102 };
103 let description = if req.description.is_empty() {
104 None
105 } else {
106 Some(req.description)
107 };
108
109 match self.memory.create_session(name, description).await {
110 Ok(session_id) => Ok(Response::new(CreateSessionResponse {
111 session_id: session_id.to_string(),
112 })),
113 Err(e) => {
114 error!("gRPC CreateSession failed: {}", e);
115 Err(Status::internal(e))
116 }
117 }
118 }
119
120 async fn list_sessions(
121 &self,
122 request: Request<ListSessionsRequest>,
123 ) -> Result<Response<ListSessionsResponse>, Status> {
124 let req = request.into_inner();
125
126 let session_ids = if !req.name_filter.is_empty() {
127 self.memory
128 .find_sessions_by_name_or_description(&req.name_filter)
129 .await
130 .map_err(Status::internal)?
131 } else {
132 self.memory
133 .list_sessions()
134 .await
135 .map_err(Status::internal)?
136 };
137
138 let limit = if req.limit > 0 {
139 req.limit as usize
140 } else {
141 100
142 };
143
144 let mut sessions = Vec::new();
145 for session_id in session_ids.into_iter().take(limit) {
146 if let Ok(session_arc) = self.memory.get_session(session_id).await {
147 let session = session_arc.load();
148 sessions.push(SessionInfo {
149 session_id: session_id.to_string(),
150 name: session.name().unwrap_or_default(),
151 description: session.description().unwrap_or_default(),
152 created_at_unix: session.created_at().timestamp(),
153 update_count: session.incremental_updates.len() as u32,
154 });
155 }
156 }
157
158 Ok(Response::new(ListSessionsResponse { sessions }))
159 }
160
161 async fn load_session(
162 &self,
163 request: Request<LoadSessionRequest>,
164 ) -> Result<Response<LoadSessionResponse>, Status> {
165 let req = request.into_inner();
166 debug!("gRPC LoadSession: session_id={}", req.session_id);
167
168 let session_id = parse_uuid(&req.session_id)?;
169
170 let session_arc = self
171 .memory
172 .get_session(session_id)
173 .await
174 .map_err(Status::not_found)?;
175 let session = session_arc.load();
176
177 let info = SessionInfo {
178 session_id: session_id.to_string(),
179 name: session.name().unwrap_or_default(),
180 description: session.description().unwrap_or_default(),
181 created_at_unix: session.created_at().timestamp(),
182 update_count: session.incremental_updates.len() as u32,
183 };
184
185 Ok(Response::new(LoadSessionResponse {
186 session: Some(info),
187 }))
188 }
189
190 async fn search_sessions(
191 &self,
192 request: Request<SearchSessionsRequest>,
193 ) -> Result<Response<SearchSessionsResponse>, Status> {
194 let req = request.into_inner();
195 debug!("gRPC SearchSessions: query={}", req.query);
196
197 if req.query.is_empty() {
198 return Err(Status::invalid_argument("query cannot be empty"));
199 }
200
201 let session_ids = self
202 .memory
203 .find_sessions_by_name_or_description(&req.query)
204 .await
205 .map_err(Status::internal)?;
206
207 let mut sessions = Vec::new();
208 for session_id in session_ids {
209 if let Ok(session_arc) = self.memory.get_session(session_id).await {
210 let session = session_arc.load();
211 sessions.push(SessionInfo {
212 session_id: session_id.to_string(),
213 name: session.name().unwrap_or_default(),
214 description: session.description().unwrap_or_default(),
215 created_at_unix: session.created_at().timestamp(),
216 update_count: session.incremental_updates.len() as u32,
217 });
218 }
219 }
220
221 Ok(Response::new(SearchSessionsResponse { sessions }))
222 }
223
224 async fn update_session_metadata(
225 &self,
226 request: Request<UpdateSessionMetadataRequest>,
227 ) -> Result<Response<UpdateSessionMetadataResponse>, Status> {
228 let req = request.into_inner();
229 debug!("gRPC UpdateSessionMetadata: session_id={}", req.session_id);
230
231 let session_id = parse_uuid(&req.session_id)?;
232
233 match self
234 .memory
235 .update_session_metadata(session_id, req.name, req.description)
236 .await
237 {
238 Ok(()) => Ok(Response::new(UpdateSessionMetadataResponse {
239 success: true,
240 })),
241 Err(e) => {
242 error!("gRPC UpdateSessionMetadata failed: {}", e);
243 Err(Status::internal(e))
244 }
245 }
246 }
247
248 async fn delete_session(
249 &self,
250 request: Request<DeleteSessionRequest>,
251 ) -> Result<Response<DeleteSessionResponse>, Status> {
252 let req = request.into_inner();
253 debug!("gRPC DeleteSession: session_id={}", req.session_id);
254
255 let session_id = parse_uuid(&req.session_id)?;
256
257 match self.memory.delete_session(session_id).await {
258 Ok(success) => Ok(Response::new(DeleteSessionResponse { success })),
259 Err(e) => {
260 error!("gRPC DeleteSession failed: {}", e);
261 Err(Status::internal(e))
262 }
263 }
264 }
265
266 async fn delete_entity(
267 &self,
268 request: Request<DeleteEntityRequest>,
269 ) -> Result<Response<DeleteEntityResponse>, Status> {
270 let req = request.into_inner();
271 debug!(
272 "gRPC DeleteEntity: session_id={} entity={}",
273 req.session_id, req.entity_name
274 );
275
276 if req.entity_name.is_empty() {
277 return Err(Status::invalid_argument("entity_name must not be empty"));
278 }
279 let session_id = parse_uuid(&req.session_id)?;
280
281 match self
282 .memory
283 .delete_entity(session_id, &req.entity_name)
284 .await
285 {
286 Ok(existed) => Ok(Response::new(DeleteEntityResponse { existed })),
287 Err(e) => {
288 error!("gRPC DeleteEntity failed: {}", e);
289 Err(Status::internal(e))
290 }
291 }
292 }
293
294 async fn update_context(
295 &self,
296 request: Request<UpdateContextRequest>,
297 ) -> Result<Response<UpdateContextResponse>, Status> {
298 let req = request.into_inner();
299 debug!(
300 "gRPC UpdateContext: session={}, type={}",
301 req.session_id, req.interaction_type
302 );
303
304 let service_req = proto_update_to_request(req)?;
305 let resp = self
306 .service
307 .update_context(service_req)
308 .await
309 .map_err(|e| {
310 error!("gRPC UpdateContext failed: {}", e);
311 system_error_to_status(e)
312 })?;
313
314 Ok(Response::new(UpdateContextResponse {
315 update_id: resp.entry_id.to_string(),
316 success: resp.durable,
317 }))
318 }
319
320 async fn bulk_update_context(
321 &self,
322 request: Request<BulkUpdateContextRequest>,
323 ) -> Result<Response<BulkUpdateContextResponse>, Status> {
324 let req = request.into_inner();
325 debug!(
326 "gRPC BulkUpdateContext: session={}, count={}",
327 req.session_id,
328 req.updates.len()
329 );
330
331 let batch_session_id = parse_uuid(&req.session_id)?;
334 let mut update_ids = Vec::new();
335 let mut success_count = 0u32;
336 let mut failure_count = 0u32;
337
338 for item in req.updates {
339 let service_req = match proto_bulk_item_to_request(batch_session_id, item) {
340 Ok(r) => r,
341 Err(status) => {
342 warn!(
343 "gRPC BulkUpdateContext item translation failed: {}",
344 status.message()
345 );
346 failure_count += 1;
347 continue;
348 }
349 };
350
351 match self.service.update_context(service_req).await {
352 Ok(resp) => {
353 update_ids.push(resp.entry_id.to_string());
354 success_count += 1;
355 }
356 Err(e) => {
357 error!("gRPC BulkUpdateContext item failed: {}", e);
358 failure_count += 1;
359 }
360 }
361 }
362
363 Ok(Response::new(BulkUpdateContextResponse {
364 update_ids,
365 success_count,
366 failure_count,
367 }))
368 }
369
370 async fn create_checkpoint(
371 &self,
372 request: Request<CreateCheckpointRequest>,
373 ) -> Result<Response<CreateCheckpointResponse>, Status> {
374 let req = request.into_inner();
375 debug!("gRPC CreateCheckpoint: session_id={}", req.session_id);
376
377 let session_id = parse_uuid(&req.session_id)?;
378
379 let session_arc = self
380 .memory
381 .get_session(session_id)
382 .await
383 .map_err(Status::not_found)?;
384 let session = session_arc.load();
385
386 let checkpoint = SessionCheckpoint {
387 id: Uuid::new_v4(),
388 session_id,
389 created_at: chrono::Utc::now(),
390 structured_context: (*session.current_state).clone(),
391 recent_updates: (*session.incremental_updates).clone(),
392 code_references: (*session.code_references).clone(),
393 change_history: (*session.change_history).clone(),
394 total_updates: session.incremental_updates.len(),
395 context_quality_score: 1.0,
396 compression_ratio: 1.0,
397 };
398 let checkpoint_id = checkpoint.id.to_string();
399
400 match self.memory.storage_actor.save_checkpoint(&checkpoint).await {
401 Ok(()) => Ok(Response::new(CreateCheckpointResponse {
402 checkpoint_id,
403 success: true,
404 })),
405 Err(e) => {
406 error!("gRPC CreateCheckpoint failed: {}", e);
407 Err(Status::internal(e))
408 }
409 }
410 }
411
412 async fn query_context(
413 &self,
414 request: Request<QueryContextRequest>,
415 ) -> Result<Response<QueryContextResponse>, Status> {
416 let req = request.into_inner();
417 let session_id = parse_uuid(&req.session_id)?;
418
419 let session_arc = self
420 .memory
421 .get_session(session_id)
422 .await
423 .map_err(Status::not_found)?;
424 let session = session_arc.load();
425
426 let limit = if req.limit > 0 {
427 req.limit as usize
428 } else {
429 50
430 };
431
432 let updates: Vec<ContextUpdateEntry> = session
433 .incremental_updates
434 .iter()
435 .filter(|u| {
436 if !req.interaction_type.is_empty() {
437 let ut = format!("{:?}", u.update_type);
438 ut.to_lowercase()
439 .contains(&req.interaction_type.to_lowercase())
440 } else {
441 true
442 }
443 })
444 .filter(|u| {
445 if req.after_unix > 0 {
446 u.timestamp.timestamp() > req.after_unix
447 } else {
448 true
449 }
450 })
451 .take(limit)
452 .map(|u| ContextUpdateEntry {
453 id: u.id.to_string(),
454 interaction_type: format!("{:?}", u.update_type),
455 content: Some(ContextContent {
456 title: u.content.title.clone(),
457 description: u.content.description.clone(),
458 details: u.content.details.clone(),
459 examples: u.content.examples.clone(),
460 implications: u.content.implications.clone(),
461 code_ref: u.related_code.as_ref().map(|c| CodeReference {
462 file_path: c.file_path.clone(),
463 start_line: c.start_line,
464 end_line: c.end_line,
465 code_snippet: c.code_snippet.clone(),
466 commit_hash: c.commit_hash.clone().unwrap_or_default(),
467 branch: c.branch.clone().unwrap_or_default(),
468 change_description: c.change_description.clone(),
469 }),
470 }),
471 timestamp_unix: u.timestamp.timestamp(),
472 entities: u.creates_entities.clone(),
473 source_ref: None, })
475 .collect();
476
477 let total = updates.len() as u32;
478 Ok(Response::new(QueryContextResponse { updates, total }))
479 }
480
481 async fn semantic_search(
482 &self,
483 request: Request<SemanticSearchRequest>,
484 ) -> Result<Response<SemanticSearchResponse>, Status> {
485 let req = request.into_inner();
486 debug!(
487 "gRPC SemanticSearch: query='{}', session='{}', workspace='{:?}'",
488 req.query, req.session_id, req.workspace_id
489 );
490
491 if req.query.is_empty() {
492 return Err(Status::invalid_argument("query cannot be empty"));
493 }
494
495 let max_results = if req.max_results > 0 {
496 req.max_results as usize
497 } else {
498 10
499 };
500
501 #[cfg(feature = "embeddings")]
502 {
503 let workspace_id_str = req.workspace_id.as_deref().unwrap_or("");
505 let search_results = if !workspace_id_str.is_empty() {
506 let workspace_id = parse_uuid(workspace_id_str)?;
508 let workspace = self
509 .memory
510 .workspace_manager
511 .get_workspace(&workspace_id)
512 .ok_or_else(|| {
513 Status::not_found(format!("Workspace {workspace_id} not found"))
514 })?;
515
516 let session_ids: Vec<uuid::Uuid> = workspace
517 .get_all_sessions()
518 .into_iter()
519 .map(|(sid, _)| sid)
520 .collect();
521
522 let mut merged = Vec::new();
523 for sid in session_ids {
524 match self
525 .memory
526 .semantic_search_session(sid, &req.query, Some(max_results), None, None)
527 .await
528 {
529 Ok(results) => merged.extend(results),
530 Err(e) => warn!("SemanticSearch: session {} search failed: {}", sid, e),
531 }
532 }
533 merged.sort_by(|a, b| {
535 b.combined_score
536 .partial_cmp(&a.combined_score)
537 .unwrap_or(std::cmp::Ordering::Equal)
538 });
539 merged.truncate(max_results);
540 merged
541 } else if req.session_id.is_empty() {
542 self.memory
544 .semantic_search_global(&req.query, Some(max_results), None, None)
545 .await
546 .map_err(|e| Status::internal(format!("Search failed: {e}")))?
547 } else {
548 let session_id = parse_uuid(&req.session_id)?;
550 self.memory
551 .semantic_search_session(session_id, &req.query, Some(max_results), None, None)
552 .await
553 .map_err(|e| Status::internal(format!("Search failed: {e}")))?
554 };
555
556 let min_score = if req.min_score > 0.0 {
557 req.min_score
558 } else {
559 0.0
560 };
561
562 let results: Vec<SearchResult> = search_results
563 .into_iter()
564 .filter(|r| r.combined_score >= min_score)
565 .map(|r| SearchResult {
566 entry_id: r.content_id,
567 content: r.text_content,
568 score: r.combined_score,
569 session_id: r.session_id.to_string(),
570 content_type: format!("{:?}", r.content_type),
571 metadata: std::collections::HashMap::new(),
572 })
573 .collect();
574
575 let total_matches = results.len() as u32;
576 Ok(Response::new(SemanticSearchResponse {
577 results,
578 total_matches,
579 }))
580 }
581
582 #[cfg(not(feature = "embeddings"))]
583 {
584 Err(Status::unimplemented(
585 "Semantic search requires the 'embeddings' feature",
586 ))
587 }
588 }
589
590 async fn find_related_content(
591 &self,
592 request: Request<FindRelatedContentRequest>,
593 ) -> Result<Response<FindRelatedContentResponse>, Status> {
594 let req = request.into_inner();
595 debug!(
596 "gRPC FindRelatedContent: session={}, topic='{}'",
597 req.session_id, req.topic
598 );
599
600 if req.topic.is_empty() {
601 return Err(Status::invalid_argument("topic cannot be empty"));
602 }
603
604 #[cfg(feature = "embeddings")]
605 {
606 let session_id = parse_uuid(&req.session_id)?;
607 let max_results = req.limit.unwrap_or(10) as usize;
608
609 let search_results = self
610 .memory
611 .semantic_search_session(session_id, &req.topic, Some(max_results), None, None)
612 .await
613 .map_err(|e| Status::internal(format!("Search failed: {e}")))?;
614
615 let results: Vec<SearchResult> = search_results
616 .into_iter()
617 .map(|r| SearchResult {
618 entry_id: r.content_id,
619 content: r.text_content,
620 score: r.combined_score,
621 session_id: r.session_id.to_string(),
622 content_type: format!("{:?}", r.content_type),
623 metadata: std::collections::HashMap::new(),
624 })
625 .collect();
626
627 Ok(Response::new(FindRelatedContentResponse { results }))
628 }
629
630 #[cfg(not(feature = "embeddings"))]
631 {
632 Err(Status::unimplemented(
633 "FindRelatedContent requires the 'embeddings' feature",
634 ))
635 }
636 }
637
638 async fn vectorize_session(
639 &self,
640 request: Request<VectorizeSessionRequest>,
641 ) -> Result<Response<VectorizeSessionResponse>, Status> {
642 let req = request.into_inner();
643 debug!("gRPC VectorizeSession: session_id={}", req.session_id);
644
645 let session_id = parse_uuid(&req.session_id)?;
646
647 #[cfg(feature = "embeddings")]
648 {
649 match self.memory.vectorize_session(session_id).await {
650 Ok(vectors_created) => Ok(Response::new(VectorizeSessionResponse {
651 success: true,
652 vectors_created: vectors_created as u32,
653 })),
654 Err(e) => {
655 error!("gRPC VectorizeSession failed: {}", e);
656 Err(Status::internal(e))
657 }
658 }
659 }
660
661 #[cfg(not(feature = "embeddings"))]
662 {
663 Err(Status::unimplemented(
664 "VectorizeSession requires the 'embeddings' feature",
665 ))
666 }
667 }
668
669 async fn get_vectorization_stats(
670 &self,
671 _request: Request<GetVectorizationStatsRequest>,
672 ) -> Result<Response<TextResponse>, Status> {
673 debug!("gRPC GetVectorizationStats");
674
675 #[cfg(feature = "embeddings")]
676 {
677 match self.memory.get_vectorization_stats() {
678 Ok(stats) => {
679 let text =
680 serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "{}".to_string());
681 Ok(Response::new(TextResponse { text }))
682 }
683 Err(e) => {
684 error!("gRPC GetVectorizationStats failed: {}", e);
685 Err(Status::internal(e))
686 }
687 }
688 }
689
690 #[cfg(not(feature = "embeddings"))]
691 {
692 Err(Status::unimplemented(
693 "GetVectorizationStats requires the 'embeddings' feature",
694 ))
695 }
696 }
697
698 async fn get_structured_summary(
701 &self,
702 request: Request<GetStructuredSummaryRequest>,
703 ) -> Result<Response<TextResponse>, Status> {
704 let req = request.into_inner();
705 debug!("gRPC GetStructuredSummary: session_id={}", req.session_id);
706
707 let session_id = parse_uuid(&req.session_id)?;
708 let session_arc = self
709 .memory
710 .get_session(session_id)
711 .await
712 .map_err(Status::not_found)?;
713 let session = session_arc.load();
714
715 use post_cortex_core::summary::{SummaryGenerator, SummaryOptions};
716 let user_requested_compact = req.compact.unwrap_or(false);
717 const MAX_TOKENS: usize = 50_000;
718
719 let (_estimated_tokens, should_compact) =
720 SummaryGenerator::estimate_summary_size(&session, MAX_TOKENS);
721 let auto_compacted = !user_requested_compact && should_compact;
722
723 let options = if user_requested_compact || auto_compacted {
724 SummaryOptions::compact()
725 } else {
726 SummaryOptions {
727 decisions_limit: req.decisions_limit.map(|v| v as usize),
728 entities_limit: req.entities_limit.map(|v| v as usize),
729 questions_limit: req.questions_limit.map(|v| v as usize),
730 concepts_limit: req.concepts_limit.map(|v| v as usize),
731 min_confidence: req.min_confidence,
732 compact: false,
733 }
734 };
735
736 let summary = SummaryGenerator::generate_structured_summary_filtered(&session, &options);
737 let text =
738 serde_json::to_string_pretty(&summary).unwrap_or_else(|_| format!("{summary:?}"));
739
740 Ok(Response::new(TextResponse { text }))
741 }
742
743 async fn get_key_decisions(
744 &self,
745 request: Request<GetKeyDecisionsRequest>,
746 ) -> Result<Response<TextResponse>, Status> {
747 let req = request.into_inner();
748 debug!("gRPC GetKeyDecisions: session_id={}", req.session_id);
749
750 let session_id = parse_uuid(&req.session_id)?;
751 let session_arc = self
752 .memory
753 .get_session(session_id)
754 .await
755 .map_err(Status::not_found)?;
756 let session = session_arc.load();
757
758 use post_cortex_core::summary::SummaryGenerator;
759 let decisions = SummaryGenerator::extract_decision_timeline(&session);
760 let text =
761 serde_json::to_string_pretty(&decisions).unwrap_or_else(|_| format!("{decisions:?}"));
762
763 Ok(Response::new(TextResponse { text }))
764 }
765
766 async fn get_key_insights(
767 &self,
768 request: Request<GetKeyInsightsRequest>,
769 ) -> Result<Response<TextResponse>, Status> {
770 let req = request.into_inner();
771 debug!("gRPC GetKeyInsights: session_id={}", req.session_id);
772
773 let session_id = parse_uuid(&req.session_id)?;
774 let session_arc = self
775 .memory
776 .get_session(session_id)
777 .await
778 .map_err(Status::not_found)?;
779 let session = session_arc.load();
780
781 use post_cortex_core::summary::SummaryGenerator;
782 let insights = SummaryGenerator::extract_key_insights(
783 &session,
784 req.limit.map(|v| v as usize).unwrap_or(5),
785 );
786 let text =
787 serde_json::to_string_pretty(&insights).unwrap_or_else(|_| format!("{insights:?}"));
788
789 Ok(Response::new(TextResponse { text }))
790 }
791
792 async fn get_entity_importance(
793 &self,
794 request: Request<GetEntityImportanceRequest>,
795 ) -> Result<Response<TextResponse>, Status> {
796 let req = request.into_inner();
797 debug!("gRPC GetEntityImportance: session_id={}", req.session_id);
798
799 let session_id = parse_uuid(&req.session_id)?;
800 let session_arc = self
801 .memory
802 .get_session(session_id)
803 .await
804 .map_err(Status::not_found)?;
805 let session = session_arc.load();
806
807 let mut analysis = session.entity_graph.analyze_entity_importance();
808 if let Some(min_imp) = req.min_importance {
809 analysis.retain(|a| a.importance_score >= min_imp);
810 }
811 if let Some(limit) = req.limit {
812 analysis.truncate(limit as usize);
813 }
814 let text =
815 serde_json::to_string_pretty(&analysis).unwrap_or_else(|_| format!("{analysis:?}"));
816
817 Ok(Response::new(TextResponse { text }))
818 }
819
820 async fn get_entity_network(
821 &self,
822 request: Request<GetEntityNetworkRequest>,
823 ) -> Result<Response<TextResponse>, Status> {
824 let req = request.into_inner();
825 debug!("gRPC GetEntityNetwork: session_id={}", req.session_id);
826
827 let session_id = parse_uuid(&req.session_id)?;
828 let session_arc = self
829 .memory
830 .get_session(session_id)
831 .await
832 .map_err(Status::not_found)?;
833 let session = session_arc.load();
834
835 let network = match req.center_entity {
836 Some(entity) => session.entity_graph.get_entity_network(&entity, 2),
837 None => {
838 let top_entities = session.entity_graph.get_most_important_entities(1);
839 if let Some(top) = top_entities.first() {
840 session.entity_graph.get_entity_network(&top.name, 2)
841 } else {
842 session.entity_graph.get_entity_network("", 2)
843 }
844 }
845 };
846 let text =
847 serde_json::to_string_pretty(&network).unwrap_or_else(|_| format!("{network:?}"));
848
849 Ok(Response::new(TextResponse { text }))
850 }
851
852 async fn get_session_statistics(
853 &self,
854 request: Request<GetSessionStatisticsRequest>,
855 ) -> Result<Response<TextResponse>, Status> {
856 let req = request.into_inner();
857 debug!("gRPC GetSessionStatistics: session_id={}", req.session_id);
858
859 let session_id = parse_uuid(&req.session_id)?;
860 let session_arc = self
861 .memory
862 .get_session(session_id)
863 .await
864 .map_err(Status::not_found)?;
865 let session = session_arc.load();
866
867 use post_cortex_core::summary::SummaryGenerator;
868 let stats = SummaryGenerator::calculate_session_stats(&session);
869 let text = serde_json::to_string_pretty(&stats).unwrap_or_else(|_| format!("{stats:?}"));
870
871 Ok(Response::new(TextResponse { text }))
872 }
873
874 async fn assemble_context(
877 &self,
878 request: Request<AssembleContextRequest>,
879 ) -> Result<Response<AssembleContextResponse>, Status> {
880 let req = request.into_inner();
881 debug!(
882 "gRPC AssembleContext: session_id={}, workspace_id={:?}, query={}",
883 req.session_id, req.workspace_id, req.query
884 );
885
886 let token_budget = if req.token_budget == 0 {
887 4000
888 } else {
889 req.token_budget as usize
890 };
891
892 use post_cortex_core::graph::entity_graph::SimpleEntityGraph;
893 use post_cortex_memory::context_assembly;
894
895 let (updates, graph) = if req
897 .workspace_id
898 .as_deref()
899 .map(|s| !s.is_empty())
900 .unwrap_or(false)
901 {
902 let workspace_id = parse_uuid(req.workspace_id.as_deref().unwrap_or(""))?;
904 let workspace = self
905 .memory
906 .workspace_manager
907 .get_workspace(&workspace_id)
908 .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
909
910 let mut all_updates = Vec::new();
911 let mut merged_graph = SimpleEntityGraph::new();
912
913 for (ws_session_id, _role) in workspace.get_all_sessions() {
914 match self.memory.get_session(ws_session_id).await {
915 Ok(session_arc) => {
916 let session = session_arc.load();
917 all_updates.extend(session.hot_context.iter().iter().cloned());
918 all_updates.extend(session.warm_context.iter().map(|c| c.update.clone()));
919 merged_graph.merge_from(&session.entity_graph);
920 }
921 Err(e) => {
922 warn!(
923 "AssembleContext: could not load session {} from workspace {}: {}",
924 ws_session_id, workspace_id, e
925 );
926 }
927 }
928 }
929
930 (all_updates, merged_graph)
931 } else {
932 let session_id = parse_uuid(&req.session_id)?;
934 let session_arc = self
935 .memory
936 .get_session(session_id)
937 .await
938 .map_err(Status::not_found)?;
939 let session = session_arc.load();
940 let updates: Vec<_> = session
941 .hot_context
942 .iter()
943 .iter()
944 .chain(session.warm_context.iter().map(|c| &c.update))
945 .cloned()
946 .collect();
947 (updates, (*session.entity_graph).clone())
948 };
949
950 let assembled =
951 context_assembly::assemble_context(&req.query, &graph, &updates, token_budget);
952
953 let formatted_text = context_assembly::format_for_llm(&assembled);
954
955 let items: Vec<AssembledContextItem> = assembled
957 .items
958 .iter()
959 .map(|item| {
960 let (source, via_entity) = match &item.source {
961 context_assembly::ContextSource::SemanticMatch => {
962 ("semantic_match".to_string(), String::new())
963 }
964 context_assembly::ContextSource::GraphTraversal { via_entity } => {
965 ("graph_traversal".to_string(), via_entity.clone())
966 }
967 context_assembly::ContextSource::RecentUpdate => {
968 ("recent_update".to_string(), String::new())
969 }
970 };
971 AssembledContextItem {
972 text: item.text.clone(),
973 score: item.score,
974 source,
975 via_entity,
976 entities: item.entities.clone(),
977 token_estimate: item.token_estimate as u32,
978 entry_id: item.entry_id.clone(),
979 }
980 })
981 .collect();
982
983 let entity_context: Vec<AssembledEntityContext> = assembled
984 .entity_context
985 .iter()
986 .map(|ec| {
987 let (relevance, via_entity, via_relation) = match &ec.relevance {
988 context_assembly::EntityRelevance::DirectMention => {
989 ("direct_mention".to_string(), String::new(), String::new())
990 }
991 context_assembly::EntityRelevance::GraphNeighbor { via, relation } => {
992 ("graph_neighbor".to_string(), via.clone(), relation.clone())
993 }
994 };
995 let relationships: Vec<EntityRelation> = ec
996 .relationships
997 .iter()
998 .map(|r| EntityRelation {
999 from_entity: r.from_entity.clone(),
1000 to_entity: r.to_entity.clone(),
1001 relation_type: format!("{:?}", r.relation_type),
1002 context: r.context.clone(),
1003 })
1004 .collect();
1005 AssembledEntityContext {
1006 name: ec.name.clone(),
1007 relevance,
1008 via_entity,
1009 via_relation,
1010 relationships,
1011 }
1012 })
1013 .collect();
1014
1015 let impact: Vec<pb::ImpactEntry> = assembled
1016 .impact
1017 .iter()
1018 .map(|i| pb::ImpactEntry {
1019 entity: i.entity.clone(),
1020 depends_on: i.depends_on.clone(),
1021 relation_type: format!("{:?}", i.relation_type),
1022 context: i.context.clone(),
1023 })
1024 .collect();
1025
1026 Ok(Response::new(AssembleContextResponse {
1027 items,
1028 entity_context,
1029 impact,
1030 total_tokens: assembled.total_tokens as u32,
1031 formatted_text,
1032 }))
1033 }
1034
1035 async fn create_workspace(
1038 &self,
1039 request: Request<CreateWorkspaceRequest>,
1040 ) -> Result<Response<CreateWorkspaceResponse>, Status> {
1041 let req = request.into_inner();
1042 debug!("gRPC CreateWorkspace: name={}", req.name);
1043
1044 let workspace_id = self
1045 .memory
1046 .workspace_manager
1047 .create_workspace(req.name.clone(), req.description.clone());
1048
1049 if let Err(e) = self
1051 .memory
1052 .save_workspace_metadata(workspace_id, &req.name, &req.description, &[])
1053 .await
1054 {
1055 warn!("gRPC CreateWorkspace: failed to persist metadata: {}", e);
1056 }
1057
1058 Ok(Response::new(CreateWorkspaceResponse {
1059 workspace_id: workspace_id.to_string(),
1060 }))
1061 }
1062
1063 async fn get_workspace(
1064 &self,
1065 request: Request<GetWorkspaceRequest>,
1066 ) -> Result<Response<GetWorkspaceResponse>, Status> {
1067 let req = request.into_inner();
1068 debug!("gRPC GetWorkspace: workspace_id={}", req.workspace_id);
1069
1070 let workspace_id = parse_uuid(&req.workspace_id)?;
1071
1072 let workspace = self
1073 .memory
1074 .workspace_manager
1075 .get_workspace(&workspace_id)
1076 .ok_or_else(|| Status::not_found(format!("Workspace {} not found", workspace_id)))?;
1077
1078 let info = workspace_to_info(&workspace);
1079
1080 Ok(Response::new(GetWorkspaceResponse {
1081 workspace: Some(info),
1082 }))
1083 }
1084
1085 async fn list_workspaces(
1086 &self,
1087 _request: Request<ListWorkspacesRequest>,
1088 ) -> Result<Response<ListWorkspacesResponse>, Status> {
1089 debug!("gRPC ListWorkspaces");
1090
1091 let workspaces = self.memory.workspace_manager.list_workspaces();
1092
1093 let infos: Vec<WorkspaceInfo> = workspaces.iter().map(|ws| workspace_to_info(ws)).collect();
1094
1095 Ok(Response::new(ListWorkspacesResponse { workspaces: infos }))
1096 }
1097
1098 async fn delete_workspace(
1099 &self,
1100 request: Request<DeleteWorkspaceRequest>,
1101 ) -> Result<Response<DeleteWorkspaceResponse>, Status> {
1102 let req = request.into_inner();
1103 debug!("gRPC DeleteWorkspace: workspace_id={}", req.workspace_id);
1104
1105 let workspace_id = parse_uuid(&req.workspace_id)?;
1106
1107 let deleted = self
1108 .memory
1109 .workspace_manager
1110 .delete_workspace(&workspace_id);
1111
1112 let success = deleted.is_some();
1113 Ok(Response::new(DeleteWorkspaceResponse { success }))
1114 }
1115
1116 async fn add_session_to_workspace(
1117 &self,
1118 request: Request<AddSessionToWorkspaceRequest>,
1119 ) -> Result<Response<AddSessionToWorkspaceResponse>, Status> {
1120 let req = request.into_inner();
1121 debug!(
1122 "gRPC AddSessionToWorkspace: workspace={}, session={}, role={}",
1123 req.workspace_id, req.session_id, req.role
1124 );
1125
1126 let workspace_id = parse_uuid(&req.workspace_id)?;
1127 let session_id = parse_uuid(&req.session_id)?;
1128 let role = parse_session_role(&req.role);
1129
1130 self.memory
1131 .workspace_manager
1132 .add_session_to_workspace(&workspace_id, session_id, role)
1133 .map_err(Status::not_found)?;
1134
1135 if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1137 let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1138 if let Err(e) = self
1139 .memory
1140 .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1141 .await
1142 {
1143 warn!(
1144 "gRPC AddSessionToWorkspace: failed to persist metadata: {}",
1145 e
1146 );
1147 }
1148 }
1149
1150 Ok(Response::new(AddSessionToWorkspaceResponse {
1151 success: true,
1152 }))
1153 }
1154
1155 async fn remove_session_from_workspace(
1156 &self,
1157 request: Request<RemoveSessionFromWorkspaceRequest>,
1158 ) -> Result<Response<RemoveSessionFromWorkspaceResponse>, Status> {
1159 let req = request.into_inner();
1160 debug!(
1161 "gRPC RemoveSessionFromWorkspace: workspace={}, session={}",
1162 req.workspace_id, req.session_id
1163 );
1164
1165 let workspace_id = parse_uuid(&req.workspace_id)?;
1166 let session_id = parse_uuid(&req.session_id)?;
1167
1168 self.memory
1169 .workspace_manager
1170 .remove_session_from_workspace(&workspace_id, &session_id)
1171 .map_err(Status::not_found)?;
1172
1173 if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1175 let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1176 if let Err(e) = self
1177 .memory
1178 .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1179 .await
1180 {
1181 warn!(
1182 "gRPC RemoveSessionFromWorkspace: failed to persist metadata: {}",
1183 e
1184 );
1185 }
1186 }
1187
1188 Ok(Response::new(RemoveSessionFromWorkspaceResponse {
1189 success: true,
1190 }))
1191 }
1192
1193 async fn register_source(
1196 &self,
1197 request: Request<RegisterSourceRequest>,
1198 ) -> Result<Response<RegisterSourceAck>, Status> {
1199 self.register_source_impl(request).await
1200 }
1201
1202 async fn register_source_batch(
1203 &self,
1204 request: Request<RegisterSourceBatchRequest>,
1205 ) -> Result<Response<RegisterSourceBatchAck>, Status> {
1206 self.register_source_batch_impl(request).await
1207 }
1208
1209 async fn check_freshness(
1210 &self,
1211 request: Request<FreshnessRequest>,
1212 ) -> Result<Response<FreshnessReport>, Status> {
1213 self.check_freshness_impl(request).await
1214 }
1215
1216 async fn invalidate(
1217 &self,
1218 request: Request<InvalidateRequest>,
1219 ) -> Result<Response<InvalidateAck>, Status> {
1220 self.invalidate_impl(request).await
1221 }
1222
1223 async fn register_symbol_dependency(
1224 &self,
1225 request: Request<RegisterSymbolDependencyRequest>,
1226 ) -> Result<Response<RegisterSymbolDependencyAck>, Status> {
1227 self.register_symbol_dependency_impl(request).await
1228 }
1229
1230 async fn cascade_invalidate(
1231 &self,
1232 request: Request<CascadeInvalidateRequest>,
1233 ) -> Result<Response<CascadeInvalidateReport>, Status> {
1234 self.cascade_invalidate_impl(request).await
1235 }
1236
1237 async fn get_stale_entries_by_source(
1238 &self,
1239 request: Request<GetStaleEntriesBySourceRequest>,
1240 ) -> Result<Response<GetStaleEntriesBySourceResponse>, Status> {
1241 self.get_stale_entries_by_source_impl(request).await
1242 }
1243}
1244
1245pub async fn start_grpc_server(
1248 memory: Arc<ConversationMemorySystem>,
1249 addr: std::net::SocketAddr,
1250) -> Result<(), String> {
1251 let service = PcxGrpcService::new(memory);
1252
1253 info!("Starting gRPC server on {}", addr);
1254
1255 tonic::transport::Server::builder()
1256 .add_service(service.into_server())
1257 .serve(addr)
1258 .await
1259 .map_err(|e| format!("gRPC server error: {e}"))
1260}