Skip to main content

lean_ctx/tools/
server_lifecycle.rs

1use std::path::Path;
2use std::sync::atomic::AtomicUsize;
3use std::sync::Arc;
4use std::time::Instant;
5use tokio::sync::RwLock;
6
7use crate::core::cache::SessionCache;
8use crate::core::session::SessionState;
9
10use super::autonomy;
11use super::server::{LeanCtxServer, SessionMode};
12use super::startup::detect_startup_context;
13
14impl Default for LeanCtxServer {
15    fn default() -> Self {
16        Self::new()
17    }
18}
19
20impl LeanCtxServer {
21    /// Creates a new server with default settings, auto-detecting the project root.
22    pub fn new() -> Self {
23        Self::new_with_project_root(None)
24    }
25
26    /// Creates a new server rooted at the given project directory.
27    pub fn new_with_project_root(project_root: Option<&str>) -> Self {
28        Self::new_with_startup(
29            project_root,
30            std::env::current_dir().ok().as_deref(),
31            SessionMode::Personal,
32            "default",
33            "default",
34        )
35    }
36
37    /// Creates a new server in Context OS shared mode for a specific workspace/channel.
38    pub fn new_shared_with_context(
39        project_root: &str,
40        workspace_id: &str,
41        channel_id: &str,
42    ) -> Self {
43        Self::new_with_startup(
44            Some(project_root),
45            std::env::current_dir().ok().as_deref(),
46            SessionMode::Shared,
47            workspace_id,
48            channel_id,
49        )
50    }
51
52    pub(crate) fn new_with_startup(
53        project_root: Option<&str>,
54        startup_cwd: Option<&Path>,
55        session_mode: SessionMode,
56        workspace_id: &str,
57        channel_id: &str,
58    ) -> Self {
59        let ttl = std::env::var("LEAN_CTX_CACHE_TTL")
60            .ok()
61            .and_then(|v| v.parse().ok())
62            .unwrap_or_else(|| {
63                let cfg = crate::core::config::Config::load();
64                crate::core::config::MemoryCleanup::effective(&cfg).idle_ttl_secs()
65            });
66
67        // Purge stale graph indices on startup to prevent serving outdated data
68        crate::core::graph_index::ProjectIndex::purge_stale_indices();
69
70        // Start the RAM guardian — monitors RSS and triggers tiered eviction.
71        // At Critical pressure (>3x limit), performs emergency shutdown.
72        crate::core::memory_guard::start_guard(std::sync::Arc::new(|level| {
73            use crate::core::memory_guard::PressureLevel;
74            match level {
75                PressureLevel::Soft => {
76                    tracing::info!("[memory_guard] soft pressure — deferring new index builds");
77                }
78                PressureLevel::Medium | PressureLevel::Hard | PressureLevel::Critical => {
79                    tracing::warn!(
80                        "[memory_guard] {:?} pressure — purging jemalloc arenas",
81                        level,
82                    );
83                    crate::core::memory_guard::jemalloc_purge();
84                }
85                PressureLevel::Normal => {}
86            }
87        }));
88
89        let startup = detect_startup_context(project_root, startup_cwd);
90        let (session, context_os) = match session_mode {
91            SessionMode::Personal => {
92                let mut session = if let Some(ref root) = startup.project_root {
93                    SessionState::load_latest_for_project_root(root).unwrap_or_default()
94                } else {
95                    SessionState::load_latest().unwrap_or_default()
96                };
97                if let Some(ref root) = startup.project_root {
98                    session.project_root = Some(root.clone());
99                }
100                if let Some(ref cwd) = startup.shell_cwd {
101                    session.shell_cwd = Some(cwd.clone());
102                }
103                (Arc::new(RwLock::new(session)), None)
104            }
105            SessionMode::Shared => {
106                let Some(ref root) = startup.project_root else {
107                    // Shared mode without a project root is not useful; fall back to personal.
108                    return Self::new_with_startup(
109                        project_root,
110                        startup_cwd,
111                        SessionMode::Personal,
112                        workspace_id,
113                        channel_id,
114                    );
115                };
116                let rt = crate::core::context_os::runtime();
117                let session = rt
118                    .shared_sessions
119                    .get_or_load(root, workspace_id, channel_id);
120                rt.metrics.record_session_loaded();
121                // Ensure shell_cwd is refreshed (best-effort).
122                if let Some(ref cwd) = startup.shell_cwd {
123                    if let Ok(mut s) = session.try_write() {
124                        s.shell_cwd = Some(cwd.clone());
125                    }
126                }
127                (session, Some(rt))
128            }
129        };
130
131        if let Some(ref root) = startup.project_root {
132            crate::core::index_orchestrator::ensure_all_background(root);
133        }
134
135        Self {
136            cache: Arc::new(RwLock::new(SessionCache::new())),
137            session,
138            tool_calls: Arc::new(RwLock::new(Vec::new())),
139            call_count: Arc::new(AtomicUsize::new(0)),
140            cache_ttl_secs: ttl,
141            last_call: Arc::new(RwLock::new(Instant::now())),
142            agent_id: Arc::new(RwLock::new(None)),
143            client_name: Arc::new(RwLock::new(String::new())),
144            autonomy: Arc::new(autonomy::AutonomyState::new()),
145            loop_detector: Arc::new(RwLock::new(
146                crate::core::loop_detection::LoopDetector::with_config(
147                    &crate::core::config::Config::load().loop_detection,
148                ),
149            )),
150            workflow: Arc::new(RwLock::new(
151                crate::core::workflow::load_active().ok().flatten(),
152            )),
153            ledger: Arc::new(RwLock::new(
154                crate::core::context_ledger::ContextLedger::load(),
155            )),
156            pipeline_stats: Arc::new(RwLock::new(crate::core::pipeline::PipelineStats::new())),
157            session_mode,
158            workspace_id: if workspace_id.trim().is_empty() {
159                "default".to_string()
160            } else {
161                workspace_id.trim().to_string()
162            },
163            channel_id: if channel_id.trim().is_empty() {
164                "default".to_string()
165            } else {
166                channel_id.trim().to_string()
167            },
168            context_os,
169            context_ir: Some(std::sync::Arc::new(tokio::sync::RwLock::new(
170                crate::core::context_ir::ContextIrV1::load(),
171            ))),
172            registry: Some(std::sync::Arc::new(
173                crate::server::registry::build_registry(),
174            )),
175            rules_stale_checked: Arc::new(std::sync::atomic::AtomicBool::new(false)),
176            last_seen_event_id: Arc::new(std::sync::atomic::AtomicI64::new(0)),
177            startup_project_root: startup.project_root,
178            startup_shell_cwd: startup.shell_cwd,
179            peer: Arc::new(tokio::sync::RwLock::new(None)),
180            has_client_roots: Arc::new(std::sync::atomic::AtomicBool::new(false)),
181            roots_resolved: Arc::new(std::sync::atomic::AtomicBool::new(false)),
182            bm25_cache: Arc::new(std::sync::Mutex::new(None)),
183            progress_sender: Arc::new(std::sync::Mutex::new(None)),
184        }
185    }
186
187    /// Clears the cache and saves the session if the TTL idle threshold has been exceeded.
188    pub async fn check_idle_expiry(&self) {
189        if self.cache_ttl_secs == 0 {
190            return;
191        }
192        let last = *self.last_call.read().await;
193        if last.elapsed().as_secs() >= self.cache_ttl_secs {
194            {
195                let mut session = self.session.write().await;
196                let _ = session.save();
197            }
198            let mut cache = self.cache.write().await;
199            let count = cache.clear();
200            if count > 0 {
201                tracing::info!(
202                    "Cache auto-cleared after {}s idle ({count} file(s))",
203                    self.cache_ttl_secs
204                );
205            }
206        }
207        *self.last_call.write().await = Instant::now();
208    }
209
210    /// Aggressive cleanup on connection drop: save session, consolidate knowledge, clear caches.
211    pub async fn shutdown(&self) {
212        {
213            let session = self.session.read().await;
214            let has_insights = !session.findings.is_empty() || !session.decisions.is_empty();
215            let root = session.project_root.clone();
216            drop(session);
217
218            if has_insights {
219                if let Some(ref root) = root {
220                    crate::tools::startup::auto_consolidate_knowledge(root);
221                }
222            }
223        }
224        {
225            let mut session = self.session.write().await;
226            let _ = session.save();
227        }
228        {
229            let mut cache = self.cache.write().await;
230            let count = cache.clear();
231            if count > 0 {
232                tracing::info!("[shutdown] cleared {count} cached file(s)");
233            }
234        }
235        crate::core::memory_guard::force_purge();
236    }
237}