Skip to main content

post_cortex_daemon/daemon/grpc_service/
mod.rs

1// Copyright (c) 2025, 2026 Julius ML
2// MIT License
3
4//! gRPC service for Post-Cortex.
5//!
6//! Provides a tonic gRPC interface to the memory system, enabling
7//! native binary protocol access for coding agents like Axon.
8//!
9//! Layout: the canonical `impl PostCortex for PcxGrpcService` block lives in
10//! this file. Freshness-tracking methods (Phase 9) delegate to inherent
11//! `*_impl` methods in `freshness` to keep this file scoped to the
12//! interactive surface. Shared parsing/validation helpers live in
13//! `helpers`.
14
15use post_cortex_core::services::PostCortexService;
16use post_cortex_memory::services::MemoryServiceImpl;
17use post_cortex_memory::ConversationMemorySystem;
18use post_cortex_storage::rocksdb_storage::SessionCheckpoint;
19use std::sync::Arc;
20use tonic::{Request, Response, Status};
21use tracing::{debug, error, info, warn};
22use uuid::Uuid;
23
24// Phase 2 of the workspace refactor moved the proto-generated bindings into
25// the dedicated `post-cortex-proto` crate. The re-export below preserves the
26// long-standing `crate::daemon::grpc_service::pb::…` path for in-tree
27// callers inside `src/daemon/` while every consumer in `src/core/` and
28// `src/storage/` now imports from `post_cortex_proto::pb` directly.
29pub use post_cortex_proto::pb;
30
31mod freshness;
32mod helpers;
33
34use helpers::{
35    parse_session_role, parse_uuid, proto_bulk_item_to_request, proto_update_to_request,
36    system_error_to_status, workspace_to_info,
37};
38use pb::post_cortex_server::{PostCortex, PostCortexServer};
39use pb::*;
40
41/// gRPC service backed by ConversationMemorySystem.
42///
43/// Holds both the raw [`ConversationMemorySystem`] (used by handlers
44/// that have not yet migrated to the canonical trait) and the
45/// canonical [`MemoryServiceImpl`]. Newly-migrated handlers — currently
46/// `update_context` / `bulk_update_context` — translate proto types
47/// into [`post_cortex_core::services`] requests and delegate to
48/// `self.service`, so validation lives in exactly one place.
49pub struct PcxGrpcService {
50    pub(super) memory: Arc<ConversationMemorySystem>,
51    pub(super) service: Arc<MemoryServiceImpl>,
52}
53
54impl PcxGrpcService {
55    /// Wrap a shared memory system in a new gRPC service handle.
56    pub fn new(memory: Arc<ConversationMemorySystem>) -> Self {
57        let service = Arc::new(MemoryServiceImpl::new(memory.clone()));
58        Self { memory, service }
59    }
60
61    /// Build a gRPC service from an existing canonical service so several
62    /// transports can share the same background pipeline (Phase 11).
63    pub fn from_service(service: Arc<MemoryServiceImpl>) -> Self {
64        let memory = service.inner().clone();
65        Self { memory, service }
66    }
67
68    /// Consume `self` and return a tonic [`PostCortexServer`] ready to serve.
69    pub fn into_server(self) -> PostCortexServer<Self> {
70        PostCortexServer::new(self)
71    }
72}
73
74#[tonic::async_trait]
75impl PostCortex for PcxGrpcService {
76    async fn health(
77        &self,
78        _request: Request<HealthRequest>,
79    ) -> Result<Response<HealthResponse>, Status> {
80        let health = self.memory.get_system_health();
81
82        Ok(Response::new(HealthResponse {
83            healthy: !health.circuit_breaker_open,
84            version: env!("CARGO_PKG_VERSION").to_string(),
85            active_sessions: health.active_sessions as u64,
86            total_updates: health.total_requests,
87            embeddings_enabled: self.memory.embeddings_enabled(),
88        }))
89    }
90
91    async fn create_session(
92        &self,
93        request: Request<CreateSessionRequest>,
94    ) -> Result<Response<CreateSessionResponse>, Status> {
95        let req = request.into_inner();
96        debug!("gRPC CreateSession: name={}", req.name);
97
98        let name = if req.name.is_empty() {
99            None
100        } else {
101            Some(req.name)
102        };
103        let description = if req.description.is_empty() {
104            None
105        } else {
106            Some(req.description)
107        };
108
109        match self.memory.create_session(name, description).await {
110            Ok(session_id) => Ok(Response::new(CreateSessionResponse {
111                session_id: session_id.to_string(),
112            })),
113            Err(e) => {
114                error!("gRPC CreateSession failed: {}", e);
115                Err(Status::internal(e))
116            }
117        }
118    }
119
120    async fn list_sessions(
121        &self,
122        request: Request<ListSessionsRequest>,
123    ) -> Result<Response<ListSessionsResponse>, Status> {
124        let req = request.into_inner();
125
126        let session_ids = if !req.name_filter.is_empty() {
127            self.memory
128                .find_sessions_by_name_or_description(&req.name_filter)
129                .await
130                .map_err(Status::internal)?
131        } else {
132            self.memory
133                .list_sessions()
134                .await
135                .map_err(Status::internal)?
136        };
137
138        let limit = if req.limit > 0 {
139            req.limit as usize
140        } else {
141            100
142        };
143
144        let mut sessions = Vec::new();
145        for session_id in session_ids.into_iter().take(limit) {
146            if let Ok(session_arc) = self.memory.get_session(session_id).await {
147                let session = session_arc.load();
148                sessions.push(SessionInfo {
149                    session_id: session_id.to_string(),
150                    name: session.name().unwrap_or_default(),
151                    description: session.description().unwrap_or_default(),
152                    created_at_unix: session.created_at().timestamp(),
153                    update_count: session.incremental_updates.len() as u32,
154                });
155            }
156        }
157
158        Ok(Response::new(ListSessionsResponse { sessions }))
159    }
160
161    async fn load_session(
162        &self,
163        request: Request<LoadSessionRequest>,
164    ) -> Result<Response<LoadSessionResponse>, Status> {
165        let req = request.into_inner();
166        debug!("gRPC LoadSession: session_id={}", req.session_id);
167
168        let session_id = parse_uuid(&req.session_id)?;
169
170        let session_arc = self
171            .memory
172            .get_session(session_id)
173            .await
174            .map_err(Status::not_found)?;
175        let session = session_arc.load();
176
177        let info = SessionInfo {
178            session_id: session_id.to_string(),
179            name: session.name().unwrap_or_default(),
180            description: session.description().unwrap_or_default(),
181            created_at_unix: session.created_at().timestamp(),
182            update_count: session.incremental_updates.len() as u32,
183        };
184
185        Ok(Response::new(LoadSessionResponse {
186            session: Some(info),
187        }))
188    }
189
190    async fn search_sessions(
191        &self,
192        request: Request<SearchSessionsRequest>,
193    ) -> Result<Response<SearchSessionsResponse>, Status> {
194        let req = request.into_inner();
195        debug!("gRPC SearchSessions: query={}", req.query);
196
197        if req.query.is_empty() {
198            return Err(Status::invalid_argument("query cannot be empty"));
199        }
200
201        let session_ids = self
202            .memory
203            .find_sessions_by_name_or_description(&req.query)
204            .await
205            .map_err(Status::internal)?;
206
207        let mut sessions = Vec::new();
208        for session_id in session_ids {
209            if let Ok(session_arc) = self.memory.get_session(session_id).await {
210                let session = session_arc.load();
211                sessions.push(SessionInfo {
212                    session_id: session_id.to_string(),
213                    name: session.name().unwrap_or_default(),
214                    description: session.description().unwrap_or_default(),
215                    created_at_unix: session.created_at().timestamp(),
216                    update_count: session.incremental_updates.len() as u32,
217                });
218            }
219        }
220
221        Ok(Response::new(SearchSessionsResponse { sessions }))
222    }
223
224    async fn update_session_metadata(
225        &self,
226        request: Request<UpdateSessionMetadataRequest>,
227    ) -> Result<Response<UpdateSessionMetadataResponse>, Status> {
228        let req = request.into_inner();
229        debug!("gRPC UpdateSessionMetadata: session_id={}", req.session_id);
230
231        let session_id = parse_uuid(&req.session_id)?;
232
233        match self
234            .memory
235            .update_session_metadata(session_id, req.name, req.description)
236            .await
237        {
238            Ok(()) => Ok(Response::new(UpdateSessionMetadataResponse {
239                success: true,
240            })),
241            Err(e) => {
242                error!("gRPC UpdateSessionMetadata failed: {}", e);
243                Err(Status::internal(e))
244            }
245        }
246    }
247
248    async fn delete_session(
249        &self,
250        request: Request<DeleteSessionRequest>,
251    ) -> Result<Response<DeleteSessionResponse>, Status> {
252        let req = request.into_inner();
253        debug!("gRPC DeleteSession: session_id={}", req.session_id);
254
255        let session_id = parse_uuid(&req.session_id)?;
256
257        match self.memory.delete_session(session_id).await {
258            Ok(success) => Ok(Response::new(DeleteSessionResponse { success })),
259            Err(e) => {
260                error!("gRPC DeleteSession failed: {}", e);
261                Err(Status::internal(e))
262            }
263        }
264    }
265
266    async fn delete_entity(
267        &self,
268        request: Request<DeleteEntityRequest>,
269    ) -> Result<Response<DeleteEntityResponse>, Status> {
270        let req = request.into_inner();
271        debug!(
272            "gRPC DeleteEntity: session_id={} entity={}",
273            req.session_id, req.entity_name
274        );
275
276        if req.entity_name.is_empty() {
277            return Err(Status::invalid_argument("entity_name must not be empty"));
278        }
279        let session_id = parse_uuid(&req.session_id)?;
280
281        match self.memory.delete_entity(session_id, &req.entity_name).await {
282            Ok(existed) => Ok(Response::new(DeleteEntityResponse { existed })),
283            Err(e) => {
284                error!("gRPC DeleteEntity failed: {}", e);
285                Err(Status::internal(e))
286            }
287        }
288    }
289
290    async fn update_context(
291        &self,
292        request: Request<UpdateContextRequest>,
293    ) -> Result<Response<UpdateContextResponse>, Status> {
294        let req = request.into_inner();
295        debug!(
296            "gRPC UpdateContext: session={}, type={}",
297            req.session_id, req.interaction_type
298        );
299
300        let service_req = proto_update_to_request(req)?;
301        let resp = self
302            .service
303            .update_context(service_req)
304            .await
305            .map_err(|e| {
306                error!("gRPC UpdateContext failed: {}", e);
307                system_error_to_status(e)
308            })?;
309
310        Ok(Response::new(UpdateContextResponse {
311            update_id: resp.entry_id.to_string(),
312            success: resp.durable,
313        }))
314    }
315
316    async fn bulk_update_context(
317        &self,
318        request: Request<BulkUpdateContextRequest>,
319    ) -> Result<Response<BulkUpdateContextResponse>, Status> {
320        let req = request.into_inner();
321        debug!(
322            "gRPC BulkUpdateContext: session={}, count={}",
323            req.session_id,
324            req.updates.len()
325        );
326
327        // Translate every proto item to its canonical request, propagating
328        // session_id from the batch envelope (proto items don't carry it).
329        let batch_session_id = parse_uuid(&req.session_id)?;
330        let mut update_ids = Vec::new();
331        let mut success_count = 0u32;
332        let mut failure_count = 0u32;
333
334        for item in req.updates {
335            let service_req = match proto_bulk_item_to_request(batch_session_id, item) {
336                Ok(r) => r,
337                Err(status) => {
338                    warn!(
339                        "gRPC BulkUpdateContext item translation failed: {}",
340                        status.message()
341                    );
342                    failure_count += 1;
343                    continue;
344                }
345            };
346
347            match self.service.update_context(service_req).await {
348                Ok(resp) => {
349                    update_ids.push(resp.entry_id.to_string());
350                    success_count += 1;
351                }
352                Err(e) => {
353                    error!("gRPC BulkUpdateContext item failed: {}", e);
354                    failure_count += 1;
355                }
356            }
357        }
358
359        Ok(Response::new(BulkUpdateContextResponse {
360            update_ids,
361            success_count,
362            failure_count,
363        }))
364    }
365
366    async fn create_checkpoint(
367        &self,
368        request: Request<CreateCheckpointRequest>,
369    ) -> Result<Response<CreateCheckpointResponse>, Status> {
370        let req = request.into_inner();
371        debug!("gRPC CreateCheckpoint: session_id={}", req.session_id);
372
373        let session_id = parse_uuid(&req.session_id)?;
374
375        let session_arc = self
376            .memory
377            .get_session(session_id)
378            .await
379            .map_err(Status::not_found)?;
380        let session = session_arc.load();
381
382        let checkpoint = SessionCheckpoint {
383            id: Uuid::new_v4(),
384            session_id,
385            created_at: chrono::Utc::now(),
386            structured_context: (*session.current_state).clone(),
387            recent_updates: (*session.incremental_updates).clone(),
388            code_references: (*session.code_references).clone(),
389            change_history: (*session.change_history).clone(),
390            total_updates: session.incremental_updates.len(),
391            context_quality_score: 1.0,
392            compression_ratio: 1.0,
393        };
394        let checkpoint_id = checkpoint.id.to_string();
395
396        match self.memory.storage_actor.save_checkpoint(&checkpoint).await {
397            Ok(()) => Ok(Response::new(CreateCheckpointResponse {
398                checkpoint_id,
399                success: true,
400            })),
401            Err(e) => {
402                error!("gRPC CreateCheckpoint failed: {}", e);
403                Err(Status::internal(e))
404            }
405        }
406    }
407
408    async fn query_context(
409        &self,
410        request: Request<QueryContextRequest>,
411    ) -> Result<Response<QueryContextResponse>, Status> {
412        let req = request.into_inner();
413        let session_id = parse_uuid(&req.session_id)?;
414
415        let session_arc = self
416            .memory
417            .get_session(session_id)
418            .await
419            .map_err(Status::not_found)?;
420        let session = session_arc.load();
421
422        let limit = if req.limit > 0 {
423            req.limit as usize
424        } else {
425            50
426        };
427
428        let updates: Vec<ContextUpdateEntry> = session
429            .incremental_updates
430            .iter()
431            .filter(|u| {
432                if !req.interaction_type.is_empty() {
433                    let ut = format!("{:?}", u.update_type);
434                    ut.to_lowercase()
435                        .contains(&req.interaction_type.to_lowercase())
436                } else {
437                    true
438                }
439            })
440            .filter(|u| {
441                if req.after_unix > 0 {
442                    u.timestamp.timestamp() > req.after_unix
443                } else {
444                    true
445                }
446            })
447            .take(limit)
448            .map(|u| ContextUpdateEntry {
449                id: u.id.to_string(),
450                interaction_type: format!("{:?}", u.update_type),
451                content: Some(ContextContent {
452                    title: u.content.title.clone(),
453                    description: u.content.description.clone(),
454                    details: u.content.details.clone(),
455                    examples: u.content.examples.clone(),
456                    implications: u.content.implications.clone(),
457                    code_ref: u.related_code.as_ref().map(|c| CodeReference {
458                        file_path: c.file_path.clone(),
459                        start_line: c.start_line,
460                        end_line: c.end_line,
461                        code_snippet: c.code_snippet.clone(),
462                        commit_hash: c.commit_hash.clone().unwrap_or_default(),
463                        branch: c.branch.clone().unwrap_or_default(),
464                        change_description: c.change_description.clone(),
465                    }),
466                }),
467                timestamp_unix: u.timestamp.timestamp(),
468                entities: u.creates_entities.clone(),
469                source_ref: None, // Source tracking not yet wired
470            })
471            .collect();
472
473        let total = updates.len() as u32;
474        Ok(Response::new(QueryContextResponse { updates, total }))
475    }
476
477    async fn semantic_search(
478        &self,
479        request: Request<SemanticSearchRequest>,
480    ) -> Result<Response<SemanticSearchResponse>, Status> {
481        let req = request.into_inner();
482        debug!(
483            "gRPC SemanticSearch: query='{}', session='{}', workspace='{:?}'",
484            req.query, req.session_id, req.workspace_id
485        );
486
487        if req.query.is_empty() {
488            return Err(Status::invalid_argument("query cannot be empty"));
489        }
490
491        let max_results = if req.max_results > 0 {
492            req.max_results as usize
493        } else {
494            10
495        };
496
497        #[cfg(feature = "embeddings")]
498        {
499            // workspace_id takes precedence: search each session independently and merge results
500            let workspace_id_str = req.workspace_id.as_deref().unwrap_or("");
501            let search_results = if !workspace_id_str.is_empty() {
502                // Workspace-scoped: search all sessions in the workspace and merge
503                let workspace_id = parse_uuid(workspace_id_str)?;
504                let workspace = self
505                    .memory
506                    .workspace_manager
507                    .get_workspace(&workspace_id)
508                    .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
509
510                let session_ids: Vec<uuid::Uuid> = workspace
511                    .get_all_sessions()
512                    .into_iter()
513                    .map(|(sid, _)| sid)
514                    .collect();
515
516                let mut merged = Vec::new();
517                for sid in session_ids {
518                    match self
519                        .memory
520                        .semantic_search_session(sid, &req.query, Some(max_results), None, None)
521                        .await
522                    {
523                        Ok(results) => merged.extend(results),
524                        Err(e) => warn!(
525                            "SemanticSearch: session {} search failed: {}",
526                            sid, e
527                        ),
528                    }
529                }
530                // Sort by combined_score descending and cap at max_results
531                merged.sort_by(|a, b| b.combined_score.partial_cmp(&a.combined_score).unwrap_or(std::cmp::Ordering::Equal));
532                merged.truncate(max_results);
533                merged
534            } else if req.session_id.is_empty() {
535                // Global search
536                self.memory
537                    .semantic_search_global(&req.query, Some(max_results), None, None)
538                    .await
539                    .map_err(|e| Status::internal(format!("Search failed: {e}")))?
540            } else {
541                // Session-scoped search
542                let session_id = parse_uuid(&req.session_id)?;
543                self.memory
544                    .semantic_search_session(session_id, &req.query, Some(max_results), None, None)
545                    .await
546                    .map_err(|e| Status::internal(format!("Search failed: {e}")))?
547            };
548
549            let min_score = if req.min_score > 0.0 {
550                req.min_score
551            } else {
552                0.0
553            };
554
555            let results: Vec<SearchResult> = search_results
556                .into_iter()
557                .filter(|r| r.combined_score >= min_score)
558                .map(|r| SearchResult {
559                    entry_id: r.content_id,
560                    content: r.text_content,
561                    score: r.combined_score,
562                    session_id: r.session_id.to_string(),
563                    content_type: format!("{:?}", r.content_type),
564                    metadata: std::collections::HashMap::new(),
565                })
566                .collect();
567
568            let total_matches = results.len() as u32;
569            Ok(Response::new(SemanticSearchResponse {
570                results,
571                total_matches,
572            }))
573        }
574
575        #[cfg(not(feature = "embeddings"))]
576        {
577            Err(Status::unimplemented(
578                "Semantic search requires the 'embeddings' feature",
579            ))
580        }
581    }
582
583    async fn find_related_content(
584        &self,
585        request: Request<FindRelatedContentRequest>,
586    ) -> Result<Response<FindRelatedContentResponse>, Status> {
587        let req = request.into_inner();
588        debug!(
589            "gRPC FindRelatedContent: session={}, topic='{}'",
590            req.session_id, req.topic
591        );
592
593        if req.topic.is_empty() {
594            return Err(Status::invalid_argument("topic cannot be empty"));
595        }
596
597        #[cfg(feature = "embeddings")]
598        {
599            let session_id = parse_uuid(&req.session_id)?;
600            let max_results = req.limit.unwrap_or(10) as usize;
601
602            let search_results = self
603                .memory
604                .semantic_search_session(session_id, &req.topic, Some(max_results), None, None)
605                .await
606                .map_err(|e| Status::internal(format!("Search failed: {e}")))?;
607
608            let results: Vec<SearchResult> = search_results
609                .into_iter()
610                .map(|r| SearchResult {
611                    entry_id: r.content_id,
612                    content: r.text_content,
613                    score: r.combined_score,
614                    session_id: r.session_id.to_string(),
615                    content_type: format!("{:?}", r.content_type),
616                    metadata: std::collections::HashMap::new(),
617                })
618                .collect();
619
620            Ok(Response::new(FindRelatedContentResponse { results }))
621        }
622
623        #[cfg(not(feature = "embeddings"))]
624        {
625            Err(Status::unimplemented(
626                "FindRelatedContent requires the 'embeddings' feature",
627            ))
628        }
629    }
630
631    async fn vectorize_session(
632        &self,
633        request: Request<VectorizeSessionRequest>,
634    ) -> Result<Response<VectorizeSessionResponse>, Status> {
635        let req = request.into_inner();
636        debug!("gRPC VectorizeSession: session_id={}", req.session_id);
637
638        let session_id = parse_uuid(&req.session_id)?;
639
640        #[cfg(feature = "embeddings")]
641        {
642            match self.memory.vectorize_session(session_id).await {
643                Ok(vectors_created) => Ok(Response::new(VectorizeSessionResponse {
644                    success: true,
645                    vectors_created: vectors_created as u32,
646                })),
647                Err(e) => {
648                    error!("gRPC VectorizeSession failed: {}", e);
649                    Err(Status::internal(e))
650                }
651            }
652        }
653
654        #[cfg(not(feature = "embeddings"))]
655        {
656            Err(Status::unimplemented(
657                "VectorizeSession requires the 'embeddings' feature",
658            ))
659        }
660    }
661
662    async fn get_vectorization_stats(
663        &self,
664        _request: Request<GetVectorizationStatsRequest>,
665    ) -> Result<Response<TextResponse>, Status> {
666        debug!("gRPC GetVectorizationStats");
667
668        #[cfg(feature = "embeddings")]
669        {
670            match self.memory.get_vectorization_stats() {
671                Ok(stats) => {
672                    let text =
673                        serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "{}".to_string());
674                    Ok(Response::new(TextResponse { text }))
675                }
676                Err(e) => {
677                    error!("gRPC GetVectorizationStats failed: {}", e);
678                    Err(Status::internal(e))
679                }
680            }
681        }
682
683        #[cfg(not(feature = "embeddings"))]
684        {
685            Err(Status::unimplemented(
686                "GetVectorizationStats requires the 'embeddings' feature",
687            ))
688        }
689    }
690
691    // --- Analysis & Insights ---
692
693    async fn get_structured_summary(
694        &self,
695        request: Request<GetStructuredSummaryRequest>,
696    ) -> Result<Response<TextResponse>, Status> {
697        let req = request.into_inner();
698        debug!("gRPC GetStructuredSummary: session_id={}", req.session_id);
699
700        let session_id = parse_uuid(&req.session_id)?;
701        let session_arc = self.memory.get_session(session_id).await
702            .map_err(Status::not_found)?;
703        let session = session_arc.load();
704
705        use post_cortex_core::summary::{SummaryGenerator, SummaryOptions};
706        let user_requested_compact = req.compact.unwrap_or(false);
707        const MAX_TOKENS: usize = 50_000;
708
709        let (_estimated_tokens, should_compact) =
710            SummaryGenerator::estimate_summary_size(&session, MAX_TOKENS);
711        let auto_compacted = !user_requested_compact && should_compact;
712
713        let options = if user_requested_compact || auto_compacted {
714            SummaryOptions::compact()
715        } else {
716            SummaryOptions {
717                decisions_limit: req.decisions_limit.map(|v| v as usize),
718                entities_limit: req.entities_limit.map(|v| v as usize),
719                questions_limit: req.questions_limit.map(|v| v as usize),
720                concepts_limit: req.concepts_limit.map(|v| v as usize),
721                min_confidence: req.min_confidence,
722                compact: false,
723            }
724        };
725
726        let summary = SummaryGenerator::generate_structured_summary_filtered(&session, &options);
727        let text = serde_json::to_string_pretty(&summary)
728            .unwrap_or_else(|_| format!("{summary:?}"));
729
730        Ok(Response::new(TextResponse { text }))
731    }
732
733    async fn get_key_decisions(
734        &self,
735        request: Request<GetKeyDecisionsRequest>,
736    ) -> Result<Response<TextResponse>, Status> {
737        let req = request.into_inner();
738        debug!("gRPC GetKeyDecisions: session_id={}", req.session_id);
739
740        let session_id = parse_uuid(&req.session_id)?;
741        let session_arc = self.memory.get_session(session_id).await
742            .map_err(Status::not_found)?;
743        let session = session_arc.load();
744
745        use post_cortex_core::summary::SummaryGenerator;
746        let decisions = SummaryGenerator::extract_decision_timeline(&session);
747        let text = serde_json::to_string_pretty(&decisions)
748            .unwrap_or_else(|_| format!("{decisions:?}"));
749
750        Ok(Response::new(TextResponse { text }))
751    }
752
753    async fn get_key_insights(
754        &self,
755        request: Request<GetKeyInsightsRequest>,
756    ) -> Result<Response<TextResponse>, Status> {
757        let req = request.into_inner();
758        debug!("gRPC GetKeyInsights: session_id={}", req.session_id);
759
760        let session_id = parse_uuid(&req.session_id)?;
761        let session_arc = self.memory.get_session(session_id).await
762            .map_err(Status::not_found)?;
763        let session = session_arc.load();
764
765        use post_cortex_core::summary::SummaryGenerator;
766        let insights = SummaryGenerator::extract_key_insights(&session, req.limit.map(|v| v as usize).unwrap_or(5));
767        let text = serde_json::to_string_pretty(&insights)
768            .unwrap_or_else(|_| format!("{insights:?}"));
769
770        Ok(Response::new(TextResponse { text }))
771    }
772
773    async fn get_entity_importance(
774        &self,
775        request: Request<GetEntityImportanceRequest>,
776    ) -> Result<Response<TextResponse>, Status> {
777        let req = request.into_inner();
778        debug!("gRPC GetEntityImportance: session_id={}", req.session_id);
779
780        let session_id = parse_uuid(&req.session_id)?;
781        let session_arc = self.memory.get_session(session_id).await
782            .map_err(Status::not_found)?;
783        let session = session_arc.load();
784
785        let mut analysis = session.entity_graph.analyze_entity_importance();
786        if let Some(min_imp) = req.min_importance {
787            analysis.retain(|a| a.importance_score >= min_imp);
788        }
789        if let Some(limit) = req.limit {
790            analysis.truncate(limit as usize);
791        }
792        let text = serde_json::to_string_pretty(&analysis)
793            .unwrap_or_else(|_| format!("{analysis:?}"));
794
795        Ok(Response::new(TextResponse { text }))
796    }
797
798    async fn get_entity_network(
799        &self,
800        request: Request<GetEntityNetworkRequest>,
801    ) -> Result<Response<TextResponse>, Status> {
802        let req = request.into_inner();
803        debug!("gRPC GetEntityNetwork: session_id={}", req.session_id);
804
805        let session_id = parse_uuid(&req.session_id)?;
806        let session_arc = self.memory.get_session(session_id).await
807            .map_err(Status::not_found)?;
808        let session = session_arc.load();
809
810        let network = match req.center_entity {
811            Some(entity) => session.entity_graph.get_entity_network(&entity, 2),
812            None => {
813                let top_entities = session.entity_graph.get_most_important_entities(1);
814                if let Some(top) = top_entities.first() {
815                    session.entity_graph.get_entity_network(&top.name, 2)
816                } else {
817                    session.entity_graph.get_entity_network("", 2)
818                }
819            }
820        };
821        let text = serde_json::to_string_pretty(&network)
822            .unwrap_or_else(|_| format!("{network:?}"));
823
824        Ok(Response::new(TextResponse { text }))
825    }
826
827    async fn get_session_statistics(
828        &self,
829        request: Request<GetSessionStatisticsRequest>,
830    ) -> Result<Response<TextResponse>, Status> {
831        let req = request.into_inner();
832        debug!("gRPC GetSessionStatistics: session_id={}", req.session_id);
833
834        let session_id = parse_uuid(&req.session_id)?;
835        let session_arc = self.memory.get_session(session_id).await
836            .map_err(Status::not_found)?;
837        let session = session_arc.load();
838
839        use post_cortex_core::summary::SummaryGenerator;
840        let stats = SummaryGenerator::calculate_session_stats(&session);
841        let text = serde_json::to_string_pretty(&stats)
842            .unwrap_or_else(|_| format!("{stats:?}"));
843
844        Ok(Response::new(TextResponse { text }))
845    }
846
847    // --- Graph-Aware Context Assembly ---
848
849    async fn assemble_context(
850        &self,
851        request: Request<AssembleContextRequest>,
852    ) -> Result<Response<AssembleContextResponse>, Status> {
853        let req = request.into_inner();
854        debug!(
855            "gRPC AssembleContext: session_id={}, workspace_id={:?}, query={}",
856            req.session_id, req.workspace_id, req.query
857        );
858
859        let token_budget = if req.token_budget == 0 { 4000 } else { req.token_budget as usize };
860
861        use post_cortex_memory::context_assembly;
862        use post_cortex_core::graph::entity_graph::SimpleEntityGraph;
863
864        // Resolve scope: workspace_id takes precedence over session_id
865        let (updates, graph) = if req.workspace_id.as_deref().map(|s| !s.is_empty()).unwrap_or(false) {
866            // Workspace-scoped: merge context from all sessions in the workspace
867            let workspace_id = parse_uuid(req.workspace_id.as_deref().unwrap_or(""))?;
868            let workspace = self
869                .memory
870                .workspace_manager
871                .get_workspace(&workspace_id)
872                .ok_or_else(|| Status::not_found(format!("Workspace {workspace_id} not found")))?;
873
874            let mut all_updates = Vec::new();
875            let mut merged_graph = SimpleEntityGraph::new();
876
877            for (ws_session_id, _role) in workspace.get_all_sessions() {
878                match self.memory.get_session(ws_session_id).await {
879                    Ok(session_arc) => {
880                        let session = session_arc.load();
881                        all_updates.extend(
882                            session.hot_context.iter().iter().cloned(),
883                        );
884                        all_updates.extend(
885                            session.warm_context.iter().map(|c| c.update.clone()),
886                        );
887                        merged_graph.merge_from(&session.entity_graph);
888                    }
889                    Err(e) => {
890                        warn!(
891                            "AssembleContext: could not load session {} from workspace {}: {}",
892                            ws_session_id, workspace_id, e
893                        );
894                    }
895                }
896            }
897
898            (all_updates, merged_graph)
899        } else {
900            // Single-session (existing behaviour)
901            let session_id = parse_uuid(&req.session_id)?;
902            let session_arc = self.memory.get_session(session_id).await
903                .map_err(Status::not_found)?;
904            let session = session_arc.load();
905            let updates: Vec<_> = session.hot_context.iter().iter()
906                .chain(session.warm_context.iter().map(|c| &c.update))
907                .cloned()
908                .collect();
909            (updates, (*session.entity_graph).clone())
910        };
911
912        let assembled = context_assembly::assemble_context(
913            &req.query,
914            &graph,
915            &updates,
916            token_budget,
917        );
918
919        let formatted_text = context_assembly::format_for_llm(&assembled);
920
921        // Convert to proto types
922        let items: Vec<AssembledContextItem> = assembled.items.iter().map(|item| {
923            let (source, via_entity) = match &item.source {
924                context_assembly::ContextSource::SemanticMatch => ("semantic_match".to_string(), String::new()),
925                context_assembly::ContextSource::GraphTraversal { via_entity } => ("graph_traversal".to_string(), via_entity.clone()),
926                context_assembly::ContextSource::RecentUpdate => ("recent_update".to_string(), String::new()),
927            };
928            AssembledContextItem {
929                text: item.text.clone(),
930                score: item.score,
931                source,
932                via_entity,
933                entities: item.entities.clone(),
934                token_estimate: item.token_estimate as u32,
935                entry_id: item.entry_id.clone(),
936            }
937        }).collect();
938
939        let entity_context: Vec<AssembledEntityContext> = assembled.entity_context.iter().map(|ec| {
940            let (relevance, via_entity, via_relation) = match &ec.relevance {
941                context_assembly::EntityRelevance::DirectMention => ("direct_mention".to_string(), String::new(), String::new()),
942                context_assembly::EntityRelevance::GraphNeighbor { via, relation } => ("graph_neighbor".to_string(), via.clone(), relation.clone()),
943            };
944            let relationships: Vec<EntityRelation> = ec.relationships.iter().map(|r| {
945                EntityRelation {
946                    from_entity: r.from_entity.clone(),
947                    to_entity: r.to_entity.clone(),
948                    relation_type: format!("{:?}", r.relation_type),
949                    context: r.context.clone(),
950                }
951            }).collect();
952            AssembledEntityContext {
953                name: ec.name.clone(),
954                relevance,
955                via_entity,
956                via_relation,
957                relationships,
958            }
959        }).collect();
960
961        let impact: Vec<pb::ImpactEntry> = assembled.impact.iter().map(|i| {
962            pb::ImpactEntry {
963                entity: i.entity.clone(),
964                depends_on: i.depends_on.clone(),
965                relation_type: format!("{:?}", i.relation_type),
966                context: i.context.clone(),
967            }
968        }).collect();
969
970        Ok(Response::new(AssembleContextResponse {
971            items,
972            entity_context,
973            impact,
974            total_tokens: assembled.total_tokens as u32,
975            formatted_text,
976        }))
977    }
978
979    // --- Workspace Management ---
980
981    async fn create_workspace(
982        &self,
983        request: Request<CreateWorkspaceRequest>,
984    ) -> Result<Response<CreateWorkspaceResponse>, Status> {
985        let req = request.into_inner();
986        debug!("gRPC CreateWorkspace: name={}", req.name);
987
988        let workspace_id = self
989            .memory
990            .workspace_manager
991            .create_workspace(req.name.clone(), req.description.clone());
992
993        // Persist to storage
994        if let Err(e) = self
995            .memory
996            .save_workspace_metadata(workspace_id, &req.name, &req.description, &[])
997            .await
998        {
999            warn!("gRPC CreateWorkspace: failed to persist metadata: {}", e);
1000        }
1001
1002        Ok(Response::new(CreateWorkspaceResponse {
1003            workspace_id: workspace_id.to_string(),
1004        }))
1005    }
1006
1007    async fn get_workspace(
1008        &self,
1009        request: Request<GetWorkspaceRequest>,
1010    ) -> Result<Response<GetWorkspaceResponse>, Status> {
1011        let req = request.into_inner();
1012        debug!("gRPC GetWorkspace: workspace_id={}", req.workspace_id);
1013
1014        let workspace_id = parse_uuid(&req.workspace_id)?;
1015
1016        let workspace = self
1017            .memory
1018            .workspace_manager
1019            .get_workspace(&workspace_id)
1020            .ok_or_else(|| Status::not_found(format!("Workspace {} not found", workspace_id)))?;
1021
1022        let info = workspace_to_info(&workspace);
1023
1024        Ok(Response::new(GetWorkspaceResponse {
1025            workspace: Some(info),
1026        }))
1027    }
1028
1029    async fn list_workspaces(
1030        &self,
1031        _request: Request<ListWorkspacesRequest>,
1032    ) -> Result<Response<ListWorkspacesResponse>, Status> {
1033        debug!("gRPC ListWorkspaces");
1034
1035        let workspaces = self.memory.workspace_manager.list_workspaces();
1036
1037        let infos: Vec<WorkspaceInfo> = workspaces.iter().map(|ws| workspace_to_info(ws)).collect();
1038
1039        Ok(Response::new(ListWorkspacesResponse { workspaces: infos }))
1040    }
1041
1042    async fn delete_workspace(
1043        &self,
1044        request: Request<DeleteWorkspaceRequest>,
1045    ) -> Result<Response<DeleteWorkspaceResponse>, Status> {
1046        let req = request.into_inner();
1047        debug!("gRPC DeleteWorkspace: workspace_id={}", req.workspace_id);
1048
1049        let workspace_id = parse_uuid(&req.workspace_id)?;
1050
1051        let deleted = self
1052            .memory
1053            .workspace_manager
1054            .delete_workspace(&workspace_id);
1055
1056        let success = deleted.is_some();
1057        Ok(Response::new(DeleteWorkspaceResponse { success }))
1058    }
1059
1060    async fn add_session_to_workspace(
1061        &self,
1062        request: Request<AddSessionToWorkspaceRequest>,
1063    ) -> Result<Response<AddSessionToWorkspaceResponse>, Status> {
1064        let req = request.into_inner();
1065        debug!(
1066            "gRPC AddSessionToWorkspace: workspace={}, session={}, role={}",
1067            req.workspace_id, req.session_id, req.role
1068        );
1069
1070        let workspace_id = parse_uuid(&req.workspace_id)?;
1071        let session_id = parse_uuid(&req.session_id)?;
1072        let role = parse_session_role(&req.role);
1073
1074        self.memory
1075            .workspace_manager
1076            .add_session_to_workspace(&workspace_id, session_id, role)
1077            .map_err(Status::not_found)?;
1078
1079        // Persist updated workspace membership
1080        if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1081            let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1082            if let Err(e) = self
1083                .memory
1084                .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1085                .await
1086            {
1087                warn!(
1088                    "gRPC AddSessionToWorkspace: failed to persist metadata: {}",
1089                    e
1090                );
1091            }
1092        }
1093
1094        Ok(Response::new(AddSessionToWorkspaceResponse {
1095            success: true,
1096        }))
1097    }
1098
1099    async fn remove_session_from_workspace(
1100        &self,
1101        request: Request<RemoveSessionFromWorkspaceRequest>,
1102    ) -> Result<Response<RemoveSessionFromWorkspaceResponse>, Status> {
1103        let req = request.into_inner();
1104        debug!(
1105            "gRPC RemoveSessionFromWorkspace: workspace={}, session={}",
1106            req.workspace_id, req.session_id
1107        );
1108
1109        let workspace_id = parse_uuid(&req.workspace_id)?;
1110        let session_id = parse_uuid(&req.session_id)?;
1111
1112        self.memory
1113            .workspace_manager
1114            .remove_session_from_workspace(&workspace_id, &session_id)
1115            .map_err(Status::not_found)?;
1116
1117        // Persist updated workspace membership
1118        if let Some(ws) = self.memory.workspace_manager.get_workspace(&workspace_id) {
1119            let session_ids: Vec<Uuid> = ws.session_ids.iter().map(|entry| *entry.key()).collect();
1120            if let Err(e) = self
1121                .memory
1122                .save_workspace_metadata(workspace_id, &ws.name, &ws.description, &session_ids)
1123                .await
1124            {
1125                warn!(
1126                    "gRPC RemoveSessionFromWorkspace: failed to persist metadata: {}",
1127                    e
1128                );
1129            }
1130        }
1131
1132        Ok(Response::new(RemoveSessionFromWorkspaceResponse {
1133            success: true,
1134        }))
1135    }
1136
1137
1138    // --- Source Tracking (Phase 9): delegated to ./freshness.rs ---
1139
1140    async fn register_source(
1141        &self,
1142        request: Request<RegisterSourceRequest>,
1143    ) -> Result<Response<RegisterSourceAck>, Status> {
1144        self.register_source_impl(request).await
1145    }
1146
1147    async fn register_source_batch(
1148        &self,
1149        request: Request<RegisterSourceBatchRequest>,
1150    ) -> Result<Response<RegisterSourceBatchAck>, Status> {
1151        self.register_source_batch_impl(request).await
1152    }
1153
1154    async fn check_freshness(
1155        &self,
1156        request: Request<FreshnessRequest>,
1157    ) -> Result<Response<FreshnessReport>, Status> {
1158        self.check_freshness_impl(request).await
1159    }
1160
1161    async fn invalidate(
1162        &self,
1163        request: Request<InvalidateRequest>,
1164    ) -> Result<Response<InvalidateAck>, Status> {
1165        self.invalidate_impl(request).await
1166    }
1167
1168    async fn register_symbol_dependency(
1169        &self,
1170        request: Request<RegisterSymbolDependencyRequest>,
1171    ) -> Result<Response<RegisterSymbolDependencyAck>, Status> {
1172        self.register_symbol_dependency_impl(request).await
1173    }
1174
1175    async fn cascade_invalidate(
1176        &self,
1177        request: Request<CascadeInvalidateRequest>,
1178    ) -> Result<Response<CascadeInvalidateReport>, Status> {
1179        self.cascade_invalidate_impl(request).await
1180    }
1181
1182    async fn get_stale_entries_by_source(
1183        &self,
1184        request: Request<GetStaleEntriesBySourceRequest>,
1185    ) -> Result<Response<GetStaleEntriesBySourceResponse>, Status> {
1186        self.get_stale_entries_by_source_impl(request).await
1187    }
1188}
1189
1190/// Start the gRPC server on the given port.
1191/// Returns a future that runs until cancelled.
1192pub async fn start_grpc_server(
1193    memory: Arc<ConversationMemorySystem>,
1194    addr: std::net::SocketAddr,
1195) -> Result<(), String> {
1196    let service = PcxGrpcService::new(memory);
1197
1198    info!("Starting gRPC server on {}", addr);
1199
1200    tonic::transport::Server::builder()
1201        .add_service(service.into_server())
1202        .serve(addr)
1203        .await
1204        .map_err(|e| format!("gRPC server error: {e}"))
1205}