Skip to main content

post_cortex_daemon/daemon/grpc_service/
mod.rs

1// Copyright (c) 2025, 2026 Julius ML
2// MIT License
3
4//! gRPC service for Post-Cortex.
5//!
6//! Provides a tonic gRPC interface to the memory system, enabling
7//! native binary protocol access for coding agents like Axon.
8//!
9//! Layout: the canonical `impl PostCortex for PcxGrpcService` block lives in
10//! this file. Freshness-tracking methods (Phase 9) delegate to inherent
11//! `*_impl` methods in `freshness` to keep this file scoped to the
12//! interactive surface. Shared parsing/validation helpers live in
13//! `helpers`.
14
15use post_cortex_core::services::PostCortexService;
16use post_cortex_memory::ConversationMemorySystem;
17use post_cortex_memory::services::MemoryServiceImpl;
18use post_cortex_storage::rocksdb_storage::SessionCheckpoint;
19use std::sync::Arc;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info, warn};
22use uuid::Uuid;
23
24// Phase 2 of the workspace refactor moved the proto-generated bindings into
25// the dedicated `post-cortex-proto` crate. The re-export below preserves the
26// long-standing `crate::daemon::grpc_service::pb::…` path for in-tree
27// callers inside `src/daemon/` while every consumer in `src/core/` and
28// `src/storage/` now imports from `post_cortex_proto::pb` directly.
29pub use post_cortex_proto::pb;
30
31mod freshness;
32mod helpers;
33
34use helpers::{
35    parse_session_role, parse_uuid, proto_bulk_item_to_request, proto_update_to_request,
36    system_error_to_status, workspace_to_info,
37};
38use pb::post_cortex_server::{PostCortex, PostCortexServer};
39use pb::*;
40
41/// gRPC service backed by ConversationMemorySystem.
42///
43/// Holds both the raw [`ConversationMemorySystem`] (used by handlers
44/// that have not yet migrated to the canonical trait) and the
45/// canonical [`MemoryServiceImpl`]. Newly-migrated handlers — currently
46/// `update_context` / `bulk_update_context` — translate proto types
47/// into [`post_cortex_core::services`] requests and delegate to
48/// `self.service`, so validation lives in exactly one place.
49pub struct PcxGrpcService {
50    pub(super) memory: Arc<ConversationMemorySystem>,
51    pub(super) service: Arc<MemoryServiceImpl>,
52}
53
54impl PcxGrpcService {
55    /// Wrap a shared memory system in a new gRPC service handle.
56    pub fn new(memory: Arc<ConversationMemorySystem>) -> Self {
57        let service = Arc::new(MemoryServiceImpl::new(memory.clone()));
58        Self { memory, service }
59    }
60
61    /// Build a gRPC service from an existing canonical service so several
62    /// transports can share the same background pipeline (Phase 11).
63    pub fn from_service(service: Arc<MemoryServiceImpl>) -> Self {
64        let memory = service.inner().clone();
65        Self { memory, service }
66    }
67
68    /// Consume `self` and return a tonic [`PostCortexServer`] ready to serve.
69    pub fn into_server(self) -> PostCortexServer<Self> {
70        PostCortexServer::new(self)
71    }
72}
73
74#[tonic::async_trait]
75impl PostCortex for PcxGrpcService {
76    async fn health(
77        &self,
78        _request: Request<HealthRequest>,
79    ) -> Result<Response<HealthResponse>, Status> {
80        let health = self.memory.get_system_health();
81
82        Ok(Response::new(HealthResponse {
83            healthy: !health.circuit_breaker_open,
84            version: env!("CARGO_PKG_VERSION").to_string(),
85            active_sessions: health.active_sessions as u64,
86            total_updates: health.total_requests,
87            embeddings_enabled: self.memory.embeddings_enabled(),
88        }))
89    }
90
91    async fn create_session(
92        &self,
93        request: Request<CreateSessionRequest>,
94    ) -> Result<Response<CreateSessionResponse>, Status> {
95        let req = request.into_inner();
96        debug!("gRPC CreateSession: name={}", req.name);
97
98        let name = if req.name.is_empty() {
99            None
100        } else {
101            Some(req.name)
102        };
103        let description = if req.description.is_empty() {
104            None
105        } else {
106            Some(req.description)
107        };
108
109        match self.memory.create_session(name, description).await {
110            Ok(session_id) => Ok(Response::new(CreateSessionResponse {
111                session_id: session_id.to_string(),
112            })),
113            Err(e) => {
114                error!("gRPC CreateSession failed: {}", e);
115                Err(Status::internal(e))
116            }
117        }
118    }
119
120    async fn list_sessions(
121        &self,
122        request: Request<ListSessionsRequest>,
123    ) -> Result<Response<ListSessionsResponse>, Status> {
124        let req = request.into_inner();
125
126        let session_ids = if !req.name_filter.is_empty() {
127            self.memory
128                .find_sessions_by_name_or_description(&req.name_filter)
129                .await
130                .map_err(Status::internal)?
131        } else {
132            self.memory
133                .list_sessions()
134                .await
135                .map_err(Status::internal)?
136        };
137
138        let limit = if req.limit > 0 {
139            req.limit as usize
140        } else {
141            100
142        };
143
144        let mut sessions = Vec::new();
145        for session_id in session_ids.into_iter().take(limit) {
146            if let Ok(session_arc) = self.memory.get_session(session_id).await {
147                let session = session_arc.load();
148                sessions.push(SessionInfo {
149                    session_id: session_id.to_string(),
150                    name: session.name().unwrap_or_default(),
151                    description: session.description().unwrap_or_default(),
152                    created_at_unix: session.created_at().timestamp(),
153                    update_count: session.incremental_updates.len() as u32,
154                });
155            }
156        }
157
158        Ok(Response::new(ListSessionsResponse { sessions }))
159    }
160
161    async fn load_session(
162        &self,
163        request: Request<LoadSessionRequest>,
164    ) -> Result<Response<LoadSessionResponse>, Status> {
165        let req = request.into_inner();
166        debug!("gRPC LoadSession: session_id={}", req.session_id);
167
168        let session_id = parse_uuid(&req.session_id)?;
169
170        let session_arc = self
171            .memory
172            .get_session(session_id)
173            .await
174            .map_err(Status::not_found)?;
175        let session = session_arc.load();
176
177        let info = SessionInfo {
178            session_id: session_id.to_string(),
179            name: session.name().unwrap_or_default(),
180            description: session.description().unwrap_or_default(),
181            created_at_unix: session.created_at().timestamp(),
182            update_count: session.incremental_updates.len() as u32,
183        };
184
185        Ok(Response::new(LoadSessionResponse {
186            session: Some(info),
187        }))
188    }
189
190    async fn search_sessions(
191        &self,
192        request: Request<SearchSessionsRequest>,
193    ) -> Result<Response<SearchSessionsResponse>, Status> {
194        let req = request.into_inner();
195        debug!("gRPC SearchSessions: query={}", req.query);
196
197        if req.query.is_empty() {
198            return Err(Status::invalid_argument("query cannot be empty"));
199        }
200
201        let session_ids = self
202            .memory
203            .find_sessions_by_name_or_description(&req.query)
204            .await
205            .map_err(Status::internal)?;
206
207        let mut sessions = Vec::new();
208        for session_id in session_ids {
209            if let Ok(session_arc) = self.memory.get_session(session_id).await {
210                let session = session_arc.load();
211                sessions.push(SessionInfo {
212                    session_id: session_id.to_string(),
213                    name: session.name().unwrap_or_default(),
214                    description: session.description().unwrap_or_default(),
215                    created_at_unix: session.created_at().timestamp(),
216                    update_count: session.incremental_updates.len() as u32,
217                });
218            }
219        }
220
221        Ok(Response::new(SearchSessionsResponse { sessions }))
222    }
223
224    async fn update_session_metadata(
225        &self,
226        request: Request<UpdateSessionMetadataRequest>,
227    ) -> Result<Response<UpdateSessionMetadataResponse>, Status> {
228        let req = request.into_inner();
229        debug!("gRPC UpdateSessionMetadata: session_id={}", req.session_id);
230
231        let session_id = parse_uuid(&req.session_id)?;
232
233        match self
234            .memory
235            .update_session_metadata(session_id, req.name, req.description)
236            .await
237        {
238            Ok(()) => Ok(Response::new(UpdateSessionMetadataResponse {
239                success: true,
240            })),
241            Err(e) => {
242                error!("gRPC UpdateSessionMetadata failed: {}", e);
243                Err(Status::internal(e))
244            }
245        }
246    }
247
248    async fn delete_session(
249        &self,
250        request: Request<DeleteSessionRequest>,
251    ) -> Result<Response<DeleteSessionResponse>, Status> {
252        let req = request.into_inner();
253        debug!("gRPC DeleteSession: session_id={}", req.session_id);
254
255        let session_id = parse_uuid(&req.session_id)?;
256
257        match self.memory.delete_session(session_id).await {
258            Ok(success) => Ok(Response::new(DeleteSessionResponse { success })),
259            Err(e) => {
260                error!("gRPC DeleteSession failed: {}", e);
261                Err(Status::internal(e))
262            }
263        }
264    }
265
266    async fn delete_entity(
267        &self,
268        request: Request<DeleteEntityRequest>,
269    ) -> Result<Response<DeleteEntityResponse>, Status> {
270        let req = request.into_inner();
271        debug!(
272            "gRPC DeleteEntity: session_id={} entity={}",
273            req.session_id, req.entity_name
274        );
275
276        if req.entity_name.is_empty() {
277            return Err(Status::invalid_argument("entity_name must not be empty"));
278        }
279        let session_id = parse_uuid(&req.session_id)?;
280
281        match self
282            .memory
283            .delete_entity(session_id, &req.entity_name)
284            .await
285        {
286            Ok(existed) => Ok(Response::new(DeleteEntityResponse { existed })),
287            Err(e) => {
288                error!("gRPC DeleteEntity failed: {}", e);
289                Err(Status::internal(e))
290            }
291        }
292    }
293
294    async fn update_context(
295        &self,
296        request: Request<UpdateContextRequest>,
297    ) -> Result<Response<UpdateContextResponse>, Status> {
298        let req = request.into_inner();
299        debug!(
300            "gRPC UpdateContext: session={}, type={}",
301            req.session_id, req.interaction_type
302        );
303
304        let service_req = proto_update_to_request(req)?;
305        let resp = self
306            .service
307            .update_context(service_req)
308            .await
309            .map_err(|e| {
310                error!("gRPC UpdateContext failed: {}", e);
311                system_error_to_status(e)
312            })?;
313
314        Ok(Response::new(UpdateContextResponse {
315            update_id: resp.entry_id.to_string(),
316            success: resp.durable,
317        }))
318    }
319
320    async fn bulk_update_context(
321        &self,
322        request: Request<BulkUpdateContextRequest>,
323    ) -> Result<Response<BulkUpdateContextResponse>, Status> {
324        let req = request.into_inner();
325        debug!(
326            "gRPC BulkUpdateContext: session={}, count={}",
327            req.session_id,
328            req.updates.len()
329        );
330
331        // Translate every proto item to its canonical request, propagating
332        // session_id from the batch envelope (proto items don't carry it).
333        let batch_session_id = parse_uuid(&req.session_id)?;
334        let mut update_ids = Vec::new();
335        let mut success_count = 0u32;
336        let mut failure_count = 0u32;
337
338        for item in req.updates {
339            let service_req = match proto_bulk_item_to_request(batch_session_id, item) {
340                Ok(r) => r,
341                Err(status) => {
342                    warn!(
343                        "gRPC BulkUpdateContext item translation failed: {}",
344                        status.message()
345                    );
346                    failure_count += 1;
347                    continue;
348                }
349            };
350
351            match self.service.update_context(service_req).await {
352                Ok(resp) => {
353                    update_ids.push(resp.entry_id.to_string());
354                    success_count += 1;
355                }
356                Err(e) => {
357                    error!("gRPC BulkUpdateContext item failed: {}", e);
358                    failure_count += 1;
359                }
360            }
361        }
362
363        Ok(Response::new(BulkUpdateContextResponse {
364            update_ids,
365            success_count,
366            failure_count,
367        }))
368    }
369
370    async fn create_checkpoint(
371        &self,
372        request: Request<CreateCheckpointRequest>,
373    ) -> Result<Response<CreateCheckpointResponse>, Status> {
374        let req = request.into_inner();
375        debug!("gRPC CreateCheckpoint: session_id={}", req.session_id);
376
377        let session_id = parse_uuid(&req.session_id)?;
378
379        let session_arc = self
380            .memory
381            .get_session(session_id)
382            .await
383            .map_err(Status::not_found)?;
384        let session = session_arc.load();
385
386        let checkpoint = SessionCheckpoint {
387            id: Uuid::new_v4(),
388            session_id,
389            created_at: chrono::Utc::now(),
390            structured_context: (*session.current_state).clone(),
391            recent_updates: (*session.incremental_updates).clone(),
392            code_references: (*session.code_references).clone(),
393            change_history: (*session.change_history).clone(),
394            total_updates: session.incremental_updates.len(),
395            context_quality_score: 1.0,
396            compression_ratio: 1.0,
397        };
398        let checkpoint_id = checkpoint.id.to_string();
399
400        match self.memory.storage_actor.save_checkpoint(&checkpoint).await {
401            Ok(()) => Ok(Response::new(CreateCheckpointResponse {
402                checkpoint_id,
403                success: true,
404            })),
405            Err(e) => {
406                error!("gRPC CreateCheckpoint failed: {}", e);
407                Err(Status::internal(e))
408            }
409        }
410    }
411
412    async fn query_context(
413        &self,
414        request: Request<QueryContextRequest>,
415    ) -> Result<Response<QueryContextResponse>, Status> {
416        let req = request.into_inner();
417        let session_id = parse_uuid(&req.session_id)?;
418
419        let session_arc = self
420            .memory
421            .get_session(session_id)
422            .await
423            .map_err(Status::not_found)?;
424        let session = session_arc.load();
425
426        let limit = if req.limit > 0 {
427            req.limit as usize
428        } else {
429            50
430        };
431
432        let updates: Vec<ContextUpdateEntry> = session
433            .incremental_updates
434            .iter()
435            .filter(|u| {
436                if !req.interaction_type.is_empty() {
437                    let ut = format!("{:?}", u.update_type);
438                    ut.to_lowercase()
439                        .contains(&req.interaction_type.to_lowercase())
440                } else {
441                    true
442                }
443            })
444            .filter(|u| {
445                if req.after_unix > 0 {
446                    u.timestamp.timestamp() > req.after_unix
447                } else {
448                    true
449                }
450            })
451            .take(limit)
452            .map(|u| ContextUpdateEntry {
453                id: u.id.to_string(),
454                interaction_type: format!("{:?}", u.update_type),
455                content: Some(ContextContent {
456                    title: u.content.title.clone(),
457                    description: u.content.description.clone(),
458                    details: u.content.details.clone(),
459                    examples: u.content.examples.clone(),
460                    implications: u.content.implications.clone(),
461                    code_ref: u.related_code.as_ref().map(|c| CodeReference {
462                        file_path: c.file_path.clone(),
463                        start_line: c.start_line,
464                        end_line: c.end_line,
465                        code_snippet: c.code_snippet.clone(),
466                        commit_hash: c.commit_hash.clone().unwrap_or_default(),
467                        branch: c.branch.clone().unwrap_or_default(),
468                        change_description: c.change_description.clone(),
469                    }),
470                }),
471                timestamp_unix: u.timestamp.timestamp(),
472                entities: u.creates_entities.clone(),
473                source_ref: None, // Source tracking not yet wired
474            })
475            .collect();
476
477        let total = updates.len() as u32;
478        Ok(Response::new(QueryContextResponse { updates, total }))
479    }
480
481    async fn semantic_search(
482        &self,
483        request: Request<SemanticSearchRequest>,
484    ) -> Result<Response<SemanticSearchResponse>, Status> {
485        let req = request.into_inner();
486        debug!(
487            "gRPC SemanticSearch: query='{}', session='{}', workspace='{:?}'",
488            req.query, req.session_id, req.workspace_id
489        );
490
491        if req.query.is_empty() {
492            return Err(Status::invalid_argument("query cannot be empty"));
493        }
494
495        let max_results = if req.max_results > 0 {
496            req.max_results as usize
497        } else {
498            10
499        };
500
501        #[cfg(feature = "embeddings")]
502        {
503            // workspace_id takes precedence: search each session independently and merge results
504            let workspace_id_str = req.workspace_id.as_deref().unwrap_or("");
505            let search_results = if !workspace_id_str.is_empty() {
506                // Workspace-scoped: search all sessions in the workspace and merge
507                let workspace_id = parse_uuid(workspace_id_str)?;
508                let workspace = self
509                    .memory
510                    .workspace_manager
511                    .get_workspace(&workspace_id)
512                    .ok_or_else(|| {
513                        Status::not_found(format!("Workspace {workspace_id} not found"))
514                    })?;
515
516                let session_ids: Vec<uuid::Uuid> = workspace
517                    .get_all_sessions()
518                    .into_iter()
519                    .map(|(sid, _)| sid)
520                    .collect();
521
522                let mut merged = Vec::new();
523                for sid in session_ids {
524                    match self
525                        .memory
526                        .semantic_search_session(sid, &req.query, Some(max_results), None, None)
527                        .await
528                    {
529                        Ok(results) => merged.extend(results),
530                        Err(e) => warn!("SemanticSearch: session {} search failed: {}", sid, e),
531                    }
532                }
533                // Sort by combined_score descending and cap at max_results
534                merged.sort_by(|a, b| {
535                    b.combined_score
536                        .partial_cmp(&a.combined_score)
537                        .unwrap_or(std::cmp::Ordering::Equal)
538                });
539                merged.truncate(max_results);
540                merged
541            } else if req.session_id.is_empty() {
542                // Global search
543                self.memory
544                    .semantic_search_global(&req.query, Some(max_results), None, None)
545                    .await
546                    .map_err(|e| Status::internal(format!("Search failed: {e}")))?
547            } else {
548                // Session-scoped search
549                let session_id = parse_uuid(&req.session_id)?;
550                self.memory
551                    .semantic_search_session(session_id, &req.query, Some(max_results), None, None)
552                    .await
553                    .map_err(|e| Status::internal(format!("Search failed: {e}")))?
554            };
555
556            let min_score = if req.min_score > 0.0 {
557                req.min_score
558            } else {
559                0.0
560            };
561
562            let results: Vec<SearchResult> = search_results
563                .into_iter()
564                .filter(|r| r.combined_score >= min_score)
565                .map(|r| SearchResult {
566                    entry_id: r.content_id,
567                    content: r.text_content,
568                    score: r.combined_score,
569                    session_id: r.session_id.to_string(),
570                    content_type: format!("{:?}", r.content_type),
571                    metadata: std::collections::HashMap::new(),
572                })
573                .collect();
574
575            let total_matches = results.len() as u32;
576            Ok(Response::new(SemanticSearchResponse {
577                results,
578                total_matches,
579            }))
580        }
581
582        #[cfg(not(feature = "embeddings"))]
583        {
584            Err(Status::unimplemented(
585                "Semantic search requires the 'embeddings' feature",
586            ))
587        }
588    }
589
590    async fn find_related_content(
591        &self,
592        request: Request<FindRelatedContentRequest>,
593    ) -> Result<Response<FindRelatedContentResponse>, Status> {
594        let req = request.into_inner();
595        debug!(
596            "gRPC FindRelatedContent: session={}, topic='{}'",
597            req.session_id, req.topic
598        );
599
600        if req.topic.is_empty() {
601            return Err(Status::invalid_argument("topic cannot be empty"));
602        }
603
604        #[cfg(feature = "embeddings")]
605        {
606            let session_id = parse_uuid(&req.session_id)?;
607            let max_results = req.limit.unwrap_or(10) as usize;
608
609            let search_results = self
610                .memory
611                .semantic_search_session(session_id, &req.topic, Some(max_results), None, None)
612                .await
613                .map_err(|e| Status::internal(format!("Search failed: {e}")))?;
614
615            let results: Vec<SearchResult> = search_results
616                .into_iter()
617                .map(|r| SearchResult {
618                    entry_id: r.content_id,
619                    content: r.text_content,
620                    score: r.combined_score,
621                    session_id: r.session_id.to_string(),
622                    content_type: format!("{:?}", r.content_type),
623                    metadata: std::collections::HashMap::new(),
624                })
625                .collect();
626
627            Ok(Response::new(FindRelatedContentResponse { results }))
628        }
629
630        #[cfg(not(feature = "embeddings"))]
631        {
632            Err(Status::unimplemented(
633                "FindRelatedContent requires the 'embeddings' feature",
634            ))
635        }
636    }
637
638    async fn vectorize_session(
639        &self,
640        request: Request<VectorizeSessionRequest>,
641    ) -> Result<Response<VectorizeSessionResponse>, Status> {
642        let req = request.into_inner();
643        debug!("gRPC VectorizeSession: session_id={}", req.session_id);
644
645        let session_id = parse_uuid(&req.session_id)?;
646
647        #[cfg(feature = "embeddings")]
648        {
649            match self.memory.vectorize_session(session_id).await {
650                Ok(vectors_created) => Ok(Response::new(VectorizeSessionResponse {
651                    success: true,
652                    vectors_created: vectors_created as u32,
653                })),
654                Err(e) => {
655                    error!("gRPC VectorizeSession failed: {}", e);
656                    Err(Status::internal(e))
657                }
658            }
659        }
660
661        #[cfg(not(feature = "embeddings"))]
662        {
663            Err(Status::unimplemented(
664                "VectorizeSession requires the 'embeddings' feature",
665            ))
666        }
667    }
668
669    async fn get_vectorization_stats(
670        &self,
671        _request: Request<GetVectorizationStatsRequest>,
672    ) -> Result<Response<TextResponse>, Status> {
673        debug!("gRPC GetVectorizationStats");
674
675        #[cfg(feature = "embeddings")]
676        {
677            match self.memory.get_vectorization_stats() {
678                Ok(stats) => {
679                    let text =
680                        serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "{}".to_string());
681                    Ok(Response::new(TextResponse { text }))
682                }
683                Err(e) => {
684                    error!("gRPC GetVectorizationStats failed: {}", e);
685                    Err(Status::internal(e))
686                }
687            }
688        }
689
690        #[cfg(not(feature = "embeddings"))]
691        {
692            Err(Status::unimplemented(
693                "GetVectorizationStats requires the 'embeddings' feature",
694            ))
695        }
696    }
697
698    // --- Analysis & Insights ---
699
700    async fn get_structured_summary(
701        &self,
702        request: Request<GetStructuredSummaryRequest>,
703    ) -> Result<Response<TextResponse>, Status> {
704        let req = request.into_inner();
705        debug!("gRPC GetStructuredSummary: session_id={}", req.session_id);
706
707        let session_id = parse_uuid(&req.session_id)?;
708        let session_arc = self
709            .memory
710            .get_session(session_id)
711            .await
712            .map_err(Status::not_found)?;
713        let session = session_arc.load();
714
715        use post_cortex_core::summary::{SummaryGenerator, SummaryOptions};
716        let user_requested_compact = req.compact.unwrap_or(false);
717        const MAX_TOKENS: usize = 50_000;
718
719        let (_estimated_tokens, should_compact) =
720            SummaryGenerator::estimate_summary_size(&session, MAX_TOKENS);
721        let auto_compacted = !user_requested_compact && should_compact;
722
723        let options = if user_requested_compact || auto_compacted {
724            SummaryOptions::compact()
725        } else {
726            SummaryOptions {
727                decisions_limit: req.decisions_limit.map(|v| v as usize),
728                entities_limit: req.entities_limit.map(|v| v as usize),
729                questions_limit: req.questions_limit.map(|v| v as usize),
730                concepts_limit: req.concepts_limit.map(|v| v as usize),
731                min_confidence: req.min_confidence,
732                compact: false,
733            }
734        };
735
736        let summary = SummaryGenerator::generate_structured_summary_filtered(&session, &options);
737        let text =
738            serde_json::to_string_pretty(&summary).unwrap_or_else(|_| format!("{summary:?}"));
739
740        Ok(Response::new(TextResponse { text }))
741    }
742
743    async fn get_key_decisions(
744        &self,
745        request: Request<GetKeyDecisionsRequest>,
746    ) -> Result<Response<TextResponse>, Status> {
747        let req = request.into_inner();
748        debug!("gRPC GetKeyDecisions: session_id={}", req.session_id);
749
750        let session_id = parse_uuid(&req.session_id)?;
751        let session_arc = self
752            .memory
753            .get_session(session_id)
754            .await
755            .map_err(Status::not_found)?;
756        let session = session_arc.load();
757
758        use post_cortex_core::summary::SummaryGenerator;
759        let decisions = SummaryGenerator::extract_decision_timeline(&session);
760        let text =
761            serde_json::to_string_pretty(&decisions).unwrap_or_else(|_| format!("{decisions:?}"));
762
763        Ok(Response::new(TextResponse { text }))
764    }
765
766    async fn get_key_insights(
767        &self,
768        request: Request<GetKeyInsightsRequest>,
769    ) -> Result<Response<TextResponse>, Status> {
770        let req = request.into_inner();
771        debug!("gRPC GetKeyInsights: session_id={}", req.session_id);
772
773        let session_id = parse_uuid(&req.session_id)?;
774        let session_arc = self
775            .memory
776            .get_session(session_id)
777            .await
778            .map_err(Status::not_found)?;
779        let session = session_arc.load();
780
781        use post_cortex_core::summary::SummaryGenerator;
782        let insights = SummaryGenerator::extract_key_insights(
783            &session,
784            req.limit.map(|v| v as usize).unwrap_or(5),
785        );
786        let text =
787            serde_json::to_string_pretty(&insights).unwrap_or_else(|_| format!("{insights:?}"));
788
789        Ok(Response::new(TextResponse { text }))
790    }
791
792    async fn get_entity_importance(
793        &self,
794        request: Request<GetEntityImportanceRequest>,
795    ) -> Result<Response<TextResponse>, Status> {
796        let req = request.into_inner();
797        debug!("gRPC GetEntityImportance: session_id={}", req.session_id);
798
799        let session_id = parse_uuid(&req.session_id)?;
800        let session_arc = self
801            .memory
802            .get_session(session_id)
803            .await
804            .map_err(Status::not_found)?;
805        let session = session_arc.load();
806
807        let mut analysis = session.entity_graph.analyze_entity_importance();
808        if let Some(min_imp) = req.min_importance {
809            analysis.retain(|a| a.importance_score >= min_imp);
810        }
811        if let Some(limit) = req.limit {
812            analysis.truncate(limit as usize);
813        }
814        let text =
815            serde_json::to_string_pretty(&analysis).unwrap_or_else(|_| format!("{analysis:?}"));
816
817        Ok(Response::new(TextResponse { text }))
818    }
819
820    async fn get_entity_network(
821        &self,
822        request: Request<GetEntityNetworkRequest>,
823    ) -> Result<Response<TextResponse>, Status> {
824        let req = request.into_inner();
825        debug!("gRPC GetEntityNetwork: session_id={}", req.session_id);
826
827        let session_id = parse_uuid(&req.session_id)?;
828        let session_arc = self
829            .memory
830            .get_session(session_id)
831            .await
832            .map_err(Status::not_found)?;
833        let session = session_arc.load();
834
835        let network = match req.center_entity {
836            Some(entity) => session.entity_graph.get_entity_network(&entity, 2),
837            None => {
838                let top_entities = session.entity_graph.get_most_important_entities(1);
839                if let Some(top) = top_entities.first() {
840                    session.entity_graph.get_entity_network(&top.name, 2)
841                } else {
842                    session.entity_graph.get_entity_network("", 2)
843                }
844            }
845        };
846        let text =
847            serde_json::to_string_pretty(&network).unwrap_or_else(|_| format!("{network:?}"));
848
849        Ok(Response::new(TextResponse { text }))
850    }
851
852    async fn get_session_statistics(
853        &self,
854        request: Request<GetSessionStatisticsRequest>,
855    ) -> Result<Response<TextResponse>, Status> {
856        let req = request.into_inner();
857        debug!("gRPC GetSessionStatistics: session_id={}", req.session_id);
858
859        let session_id = parse_uuid(&req.session_id)?;
860        let session_arc = self
861            .memory
862            .get_session(session_id)
863            .await
864            .map_err(Status::not_found)?;
865        let session = session_arc.load();
866
867        use post_cortex_core::summary::SummaryGenerator;
868        let stats = SummaryGenerator::calculate_session_stats(&session);
869        let text = serde_json::to_string_pretty(&stats).unwrap_or_else(|_| format!("{stats:?}"));
870
871        Ok(Response::new(TextResponse { text }))
872    }
873
874    // --- Graph-Aware Context Assembly ---
875
876    async fn assemble_context(
877        &self,
878        request: Request<AssembleContextRequest>,
879    ) -> Result<Response<AssembleContextResponse>, Status> {
880        let req = request.into_inner();
881        debug!(
882            "gRPC AssembleContext: session_id={}, workspace_id={:?}, query={}",
883            req.session_id, req.workspace_id, req.query
884        );
885
886        let token_budget = if req.token_budget == 0 {
887            4000
888        } else {
889            req.token_budget as usize
890        };
891
892        use post_cortex_core::graph::entity_graph::SimpleEntityGraph;
893        use post_cortex_memory::context_assembly;
894
895        // Resolve scope: workspace_id takes precedence over session_id
896        let (updates, graph) = if req
897            .workspace_id
898            .as_deref()
899            .map(|s| !s.is_empty())
900            .unwrap_or(false)
901        {
902            // Workspace-scoped: merge context from all sessions in the workspace
903            let workspace_id = parse_uuid(req.workspace_id.as_deref().unwrap_or(""))?;
904            let workspace = self
905                .memory
906                .workspace_manager
907                .get_workspace(&workspace_id)
908                .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
909
910            let mut all_updates = Vec::new();
911            let mut merged_graph = SimpleEntityGraph::new();
912
913            for (ws_session_id, _role) in workspace.get_all_sessions() {
914                match self.memory.get_session(ws_session_id).await {
915                    Ok(session_arc) => {
916                        let session = session_arc.load();
917                        all_updates.extend(session.hot_context.iter().iter().cloned());
918                        all_updates.extend(session.warm_context.iter().map(|c| c.update.clone()));
919                        merged_graph.merge_from(&session.entity_graph);
920                    }
921                    Err(e) => {
922                        warn!(
923                            "AssembleContext: could not load session {} from workspace {}: {}",
924                            ws_session_id, workspace_id, e
925                        );
926                    }
927                }
928            }
929
930            (all_updates, merged_graph)
931        } else {
932            // Single-session (existing behaviour)
933            let session_id = parse_uuid(&req.session_id)?;
934            let session_arc = self
935                .memory
936                .get_session(session_id)
937                .await
938                .map_err(Status::not_found)?;
939            let session = session_arc.load();
940            let updates: Vec<_> = session
941                .hot_context
942                .iter()
943                .iter()
944                .chain(session.warm_context.iter().map(|c| &c.update))
945                .cloned()
946                .collect();
947            (updates, (*session.entity_graph).clone())
948        };
949
950        let assembled =
951            context_assembly::assemble_context(&req.query, &graph, &updates, token_budget);
952
953        let formatted_text = context_assembly::format_for_llm(&assembled);
954
955        // Convert to proto types
956        let items: Vec<AssembledContextItem> = assembled
957            .items
958            .iter()
959            .map(|item| {
960                let (source, via_entity) = match &item.source {
961                    context_assembly::ContextSource::SemanticMatch => {
962                        ("semantic_match".to_string(), String::new())
963                    }
964                    context_assembly::ContextSource::GraphTraversal { via_entity } => {
965                        ("graph_traversal".to_string(), via_entity.clone())
966                    }
967                    context_assembly::ContextSource::RecentUpdate => {
968                        ("recent_update".to_string(), String::new())
969                    }
970                };
971                AssembledContextItem {
972                    text: item.text.clone(),
973                    score: item.score,
974                    source,
975                    via_entity,
976                    entities: item.entities.clone(),
977                    token_estimate: item.token_estimate as u32,
978                    entry_id: item.entry_id.clone(),
979                }
980            })
981            .collect();
982
983        let entity_context: Vec<AssembledEntityContext> = assembled
984            .entity_context
985            .iter()
986            .map(|ec| {
987                let (relevance, via_entity, via_relation) = match &ec.relevance {
988                    context_assembly::EntityRelevance::DirectMention => {
989                        ("direct_mention".to_string(), String::new(), String::new())
990                    }
991                    context_assembly::EntityRelevance::GraphNeighbor { via, relation } => {
992                        ("graph_neighbor".to_string(), via.clone(), relation.clone())
993                    }
994                };
995                let relationships: Vec<EntityRelation> = ec
996                    .relationships
997                    .iter()
998                    .map(|r| EntityRelation {
999                        from_entity: r.from_entity.clone(),
1000                        to_entity: r.to_entity.clone(),
1001                        relation_type: format!("{:?}", r.relation_type),
1002                        context: r.context.clone(),
1003                    })
1004                    .collect();
1005                AssembledEntityContext {
1006                    name: ec.name.clone(),
1007                    relevance,
1008                    via_entity,
1009                    via_relation,
1010                    relationships,
1011                }
1012            })
1013            .collect();
1014
1015        let impact: Vec<pb::ImpactEntry> = assembled
1016            .impact
1017            .iter()
1018            .map(|i| pb::ImpactEntry {
1019                entity: i.entity.clone(),
1020                depends_on: i.depends_on.clone(),
1021                relation_type: format!("{:?}", i.relation_type),
1022                context: i.context.clone(),
1023            })
1024            .collect();
1025
1026        Ok(Response::new(AssembleContextResponse {
1027            items,
1028            entity_context,
1029            impact,
1030            total_tokens: assembled.total_tokens as u32,
1031            formatted_text,
1032        }))
1033    }
1034
1035    // --- Workspace Management ---
1036
1037    async fn create_workspace(
1038        &self,
1039        request: Request<CreateWorkspaceRequest>,
1040    ) -> Result<Response<CreateWorkspaceResponse>, Status> {
1041        let req = request.into_inner();
1042        debug!("gRPC CreateWorkspace: name={}", req.name);
1043
1044        let workspace_id = self
1045            .memory
1046            .workspace_manager
1047            .create_workspace(req.name.clone(), req.description.clone());
1048
1049        // Persist to storage
1050        if let Err(e) = self
1051            .memory
1052            .save_workspace_metadata(workspace_id, &req.name, &req.description, &[])
1053            .await
1054        {
1055            warn!("gRPC CreateWorkspace: failed to persist metadata: {}", e);
1056        }
1057
1058        Ok(Response::new(CreateWorkspaceResponse {
1059            workspace_id: workspace_id.to_string(),
1060        }))
1061    }
1062
1063    async fn get_workspace(
1064        &self,
1065        request: Request<GetWorkspaceRequest>,
1066    ) -> Result<Response<GetWorkspaceResponse>, Status> {
1067        let req = request.into_inner();
1068        debug!("gRPC GetWorkspace: workspace_id={}", req.workspace_id);
1069
1070        let workspace_id = parse_uuid(&req.workspace_id)?;
1071
1072        let workspace = self
1073            .memory
1074            .workspace_manager
1075            .get_workspace(&workspace_id)
1076            .ok_or_else(|| Status::not_found(format!("Workspace {} not found", workspace_id)))?;
1077
1078        let info = workspace_to_info(&workspace);
1079
1080        Ok(Response::new(GetWorkspaceResponse {
1081            workspace: Some(info),
1082        }))
1083    }
1084
1085    async fn list_workspaces(
1086        &self,
1087        _request: Request<ListWorkspacesRequest>,
1088    ) -> Result<Response<ListWorkspacesResponse>, Status> {
1089        debug!("gRPC ListWorkspaces");
1090
1091        let workspaces = self.memory.workspace_manager.list_workspaces();
1092
1093        let infos: Vec<WorkspaceInfo> = workspaces.iter().map(|ws| workspace_to_info(ws)).collect();
1094
1095        Ok(Response::new(ListWorkspacesResponse { workspaces: infos }))
1096    }
1097
1098    async fn delete_workspace(
1099        &self,
1100        request: Request<DeleteWorkspaceRequest>,
1101    ) -> Result<Response<DeleteWorkspaceResponse>, Status> {
1102        let req = request.into_inner();
1103        debug!("gRPC DeleteWorkspace: workspace_id={}", req.workspace_id);
1104
1105        let workspace_id = parse_uuid(&req.workspace_id)?;
1106
1107        let deleted = self
1108            .memory
1109            .workspace_manager
1110            .delete_workspace(&workspace_id);
1111
1112        let success = deleted.is_some();
1113        Ok(Response::new(DeleteWorkspaceResponse { success }))
1114    }
1115
1116    async fn add_session_to_workspace(
1117        &self,
1118        request: Request<AddSessionToWorkspaceRequest>,
1119    ) -> Result<Response<AddSessionToWorkspaceResponse>, Status> {
1120        let req = request.into_inner();
1121        debug!(
1122            "gRPC AddSessionToWorkspace: workspace={}, session={}, role={}",
1123            req.workspace_id, req.session_id, req.role
1124        );
1125
1126        let workspace_id = parse_uuid(&req.workspace_id)?;
1127        let session_id = parse_uuid(&req.session_id)?;
1128        let role = parse_session_role(&req.role);
1129
1130        self.memory
1131            .workspace_manager
1132            .add_session_to_workspace(&workspace_id, session_id, role)
1133            .map_err(Status::not_found)?;
1134
1135        // Persist updated workspace membership
1136        if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1137            let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1138            if let Err(e) = self
1139                .memory
1140                .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1141                .await
1142            {
1143                warn!(
1144                    "gRPC AddSessionToWorkspace: failed to persist metadata: {}",
1145                    e
1146                );
1147            }
1148        }
1149
1150        Ok(Response::new(AddSessionToWorkspaceResponse {
1151            success: true,
1152        }))
1153    }
1154
1155    async fn remove_session_from_workspace(
1156        &self,
1157        request: Request<RemoveSessionFromWorkspaceRequest>,
1158    ) -> Result<Response<RemoveSessionFromWorkspaceResponse>, Status> {
1159        let req = request.into_inner();
1160        debug!(
1161            "gRPC RemoveSessionFromWorkspace: workspace={}, session={}",
1162            req.workspace_id, req.session_id
1163        );
1164
1165        let workspace_id = parse_uuid(&req.workspace_id)?;
1166        let session_id = parse_uuid(&req.session_id)?;
1167
1168        self.memory
1169            .workspace_manager
1170            .remove_session_from_workspace(&workspace_id, &session_id)
1171            .map_err(Status::not_found)?;
1172
1173        // Persist updated workspace membership
1174        if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1175            let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1176            if let Err(e) = self
1177                .memory
1178                .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1179                .await
1180            {
1181                warn!(
1182                    "gRPC RemoveSessionFromWorkspace: failed to persist metadata: {}",
1183                    e
1184                );
1185            }
1186        }
1187
1188        Ok(Response::new(RemoveSessionFromWorkspaceResponse {
1189            success: true,
1190        }))
1191    }
1192
1193    // --- Source Tracking (Phase 9): delegated to ./freshness.rs ---
1194
1195    async fn register_source(
1196        &self,
1197        request: Request<RegisterSourceRequest>,
1198    ) -> Result<Response<RegisterSourceAck>, Status> {
1199        self.register_source_impl(request).await
1200    }
1201
1202    async fn register_source_batch(
1203        &self,
1204        request: Request<RegisterSourceBatchRequest>,
1205    ) -> Result<Response<RegisterSourceBatchAck>, Status> {
1206        self.register_source_batch_impl(request).await
1207    }
1208
1209    async fn check_freshness(
1210        &self,
1211        request: Request<FreshnessRequest>,
1212    ) -> Result<Response<FreshnessReport>, Status> {
1213        self.check_freshness_impl(request).await
1214    }
1215
1216    async fn invalidate(
1217        &self,
1218        request: Request<InvalidateRequest>,
1219    ) -> Result<Response<InvalidateAck>, Status> {
1220        self.invalidate_impl(request).await
1221    }
1222
1223    async fn register_symbol_dependency(
1224        &self,
1225        request: Request<RegisterSymbolDependencyRequest>,
1226    ) -> Result<Response<RegisterSymbolDependencyAck>, Status> {
1227        self.register_symbol_dependency_impl(request).await
1228    }
1229
1230    async fn cascade_invalidate(
1231        &self,
1232        request: Request<CascadeInvalidateRequest>,
1233    ) -> Result<Response<CascadeInvalidateReport>, Status> {
1234        self.cascade_invalidate_impl(request).await
1235    }
1236
1237    async fn get_stale_entries_by_source(
1238        &self,
1239        request: Request<GetStaleEntriesBySourceRequest>,
1240    ) -> Result<Response<GetStaleEntriesBySourceResponse>, Status> {
1241        self.get_stale_entries_by_source_impl(request).await
1242    }
1243}
1244
1245/// Start the gRPC server on the given port.
1246/// Returns a future that runs until cancelled.
1247pub async fn start_grpc_server(
1248    memory: Arc<ConversationMemorySystem>,
1249    addr: std::net::SocketAddr,
1250) -> Result<(), String> {
1251    let service = PcxGrpcService::new(memory);
1252
1253    info!("Starting gRPC server on {}", addr);
1254
1255    tonic::transport::Server::builder()
1256        .add_service(service.into_server())
1257        .serve(addr)
1258        .await
1259        .map_err(|e| format!("gRPC server error: {e}"))
1260}