Skip to main content

synaps_cli/engine/
setup.rs

1//! Engine setup — boot sequence shared by TUI and headless modes.
2//!
3//! Extracts the initialization logic that was previously inlined in
4//! chatui/mod.rs so both renderers can use the same boot path.
5
6use crate::{Runtime, Result, Session, latest_session, resolve_session};
7use crate::skills::registry::CommandRegistry;
8use crate::skills::keybinds::KeybindRegistry;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// Options for engine boot.
13pub struct EngineOpts {
14    pub continue_session: Option<Option<String>>,
15    pub system: Option<String>,
16    pub profile: Option<String>,
17    pub no_extensions: bool,
18}
19
20/// Background tasks spawned during boot. Aborts on drop.
21pub struct BackgroundTasks {
22    watcher_shutdown: Arc<std::sync::atomic::AtomicBool>,
23    watcher_task: tokio::task::JoinHandle<()>,
24    socket_shutdown: Arc<std::sync::atomic::AtomicBool>,
25    socket_task: tokio::task::JoinHandle<()>,
26    #[allow(dead_code)] // stored for potential future use (e.g. reconnect)
27    session_socket_path: String,
28    session_id: String,
29    /// File-appender flush guard. Holding this for the lifetime of the
30    /// renderer keeps the non-blocking log writer's background thread
31    /// alive — without it, log lines emitted after `boot()` returns can
32    /// be silently dropped before they reach disk. Dropped last when
33    /// BackgroundTasks drops.
34    #[allow(dead_code)]
35    log_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
36}
37
38impl BackgroundTasks {
39    /// Signal all tasks to stop and unregister the session.
40    pub fn shutdown(&self) {
41        self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Release);
42        self.socket_shutdown.store(true, std::sync::atomic::Ordering::Release);
43        crate::events::registry::unregister_session(&self.session_id);
44    }
45}
46
47impl Drop for BackgroundTasks {
48    fn drop(&mut self) {
49        self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
50        self.socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
51        self.watcher_task.abort();
52        self.socket_task.abort();
53    }
54}
55
56/// Result of the boot sequence — everything a renderer needs to start.
57pub struct EngineBoot {
58    pub runtime: Runtime,
59    pub config: crate::SynapsConfig,
60    pub session: Session,
61    pub api_messages: Vec<serde_json::Value>,
62    pub total_input_tokens: u64,
63    pub total_output_tokens: u64,
64    pub session_cost: f64,
65    pub abort_context: Option<String>,
66    pub continued: bool,
67    pub continue_info: Option<ContinueInfo>,
68    pub registry: Arc<CommandRegistry>,
69    /// Keybind registry. Uses std::sync::RwLock (not tokio) because keybind
70    /// lookups are synchronous, fast, and called from input handling code
71    /// that cannot await. This is safe as long as the lock is never held
72    /// across an await point.
73    pub keybind_registry: Arc<std::sync::RwLock<KeybindRegistry>>,
74    pub mcp_server_count: usize,
75    pub system_prompt_path: std::path::PathBuf,
76    pub ext_manager: Arc<RwLock<crate::extensions::manager::ExtensionManager>>,
77    /// Background tasks — inbox watcher, socket listener. Aborts on drop.
78    pub background: BackgroundTasks,
79}
80
81/// Info about how a continued session was resolved.
82pub struct ContinueInfo {
83    pub session_id: String,
84    pub resolved_via: Option<String>, // "chain", "name", or None
85    pub query: String,
86}
87
88/// Run the full engine boot sequence:
89/// config → system prompt → skills → MCP → session → sockets → extensions
90pub async fn boot(opts: EngineOpts) -> Result<EngineBoot> {
91    if let Some(ref prof) = opts.profile {
92        crate::config::set_profile(Some(prof.clone()));
93    }
94
95    // Capture the WorkerGuard from the file appender. tracing-appender's
96    // non-blocking writer uses a background flush thread; the guard is
97    // an RAII handle that stops that thread on drop. The previous code
98    // dropped it at the end of boot() with a comment claiming "this is
99    // fine because tracing-subscriber uses a global subscriber" — which
100    // is true for the subscriber, but NOT for the file appender's
101    // background thread. With the guard dropped, log lines emitted after
102    // boot() returned (Extension loaded, hook traces, etc.) could be
103    // silently lost. We hand the guard down through EngineBoot so the
104    // renderer (TUI / chat / server) keeps it alive for its lifetime.
105    let log_guard = crate::logging::init_logging();
106    let mut runtime = Runtime::new().await?;
107
108    // Load config and apply
109    let config = crate::config::load_config();
110    runtime.apply_config(&config);
111
112    // Load system prompt
113    let system_prompt = crate::config::resolve_system_prompt(opts.system.as_deref());
114    runtime.set_system_prompt(system_prompt);
115
116    // Discover plugins/skills, build command registry, register load_skill tool.
117    let tools_shared = runtime.tools_shared();
118    let (registry, keybind_registry) = crate::skills::register(&tools_shared, &config).await;
119
120    // Set up lazy MCP loading (if configured in ~/.synaps-cli/mcp.json)
121    let mcp_server_count = crate::mcp::setup_lazy_mcp(&runtime.tools_shared()).await;
122
123    let system_prompt_path = crate::config::resolve_read_path("system.md");
124
125    // Session: continue existing or create new
126    let sb = resolve_or_create_session(&mut runtime, &opts.continue_session)?;
127
128    // Start inbox watcher
129    let watcher_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
130    let watcher_task = {
131        let inbox_dir = crate::config::base_dir().join("inbox");
132        let event_queue = runtime.event_queue().clone();
133        let shutdown = watcher_shutdown.clone();
134        tokio::spawn(async move {
135            crate::events::watch_inbox(inbox_dir, event_queue, shutdown).await;
136        })
137    };
138
139    // Helper: abort background tasks on error
140    let abort_tasks = |ws: &Arc<std::sync::atomic::AtomicBool>, wt: &tokio::task::JoinHandle<()>| {
141        ws.store(true, std::sync::atomic::Ordering::Relaxed);
142        wt.abort();
143    };
144
145    // Start per-session Unix socket listener + register in session registry
146    let socket_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
147    let session_socket_path = crate::events::registry::socket_path_for_session(&sb.session.id);
148    let socket_task = crate::events::socket::listen_session_socket(
149        session_socket_path.clone(),
150        runtime.event_queue().clone(),
151        socket_shutdown.clone(),
152    );
153    let session_registration = crate::events::registry::SessionRegistration {
154        session_id: sb.session.id.clone(),
155        name: sb.session.name.clone(),
156        socket_path: session_socket_path.clone(),
157        pid: std::process::id(),
158        started_at: chrono::Utc::now(),
159    };
160    if let Err(e) = crate::events::registry::register_session(&session_registration) {
161        abort_tasks(&watcher_shutdown, &watcher_task);
162        socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
163        socket_task.abort();
164        // Fail loudly: returning Ok with already-aborted handles silently
165        // poisoned downstream — server inherited dead watcher/socket tasks
166        // and a session that wasn't in the registry, so other tools couldn't
167        // see it. Better to fail boot than start in a broken state.
168        return Err(crate::core::error::RuntimeError::Session(format!(
169            "failed to register session {}: {}",
170            session_registration.session_id, e
171        )));
172    }
173
174    // Extension manager
175    let ext_mgr = crate::extensions::manager::ExtensionManager::new_with_tools(
176        Arc::clone(runtime.hook_bus()),
177        runtime.tools_shared(),
178    );
179    let ext_manager = Arc::new(RwLock::new(ext_mgr));
180    crate::runtime::openai::set_extension_manager_for_routing(Arc::clone(&ext_manager));
181
182    // Session start hook
183    {
184        let mut index_record = crate::core::session_index::SessionIndexRecord::start(&sb.session.id);
185        index_record.model = Some(sb.session.model.clone());
186        index_record.profile = crate::core::config::get_profile();
187        index_record.cwd = std::env::current_dir().ok();
188        if let Err(err) = crate::core::session_index::append_record(&index_record) {
189            tracing::warn!("failed to append session start index record: {}", err);
190        }
191
192        let hook_event = crate::extensions::hooks::events::HookEvent::on_session_start(&sb.session.id);
193        let _ = runtime.hook_bus().emit(&hook_event).await;
194    }
195
196    if mcp_server_count > 0 {
197        tracing::info!("{} MCP servers available (use connect_mcp_server to activate)", mcp_server_count);
198    }
199
200    let session_id = sb.session.id.clone();
201
202    Ok(EngineBoot {
203        runtime,
204        config,
205        session: sb.session,
206        api_messages: sb.api_messages,
207        total_input_tokens: sb.total_input_tokens,
208        total_output_tokens: sb.total_output_tokens,
209        session_cost: sb.session_cost,
210        abort_context: sb.abort_context,
211        continued: sb.continued,
212        continue_info: sb.continue_info,
213        registry,
214        keybind_registry,
215        mcp_server_count,
216        system_prompt_path,
217        ext_manager,
218        background: BackgroundTasks {
219            watcher_shutdown,
220            watcher_task,
221            socket_shutdown,
222            socket_task,
223            session_socket_path,
224            session_id,
225            log_guard,
226        },
227    })
228}
229
230/// Resolve a session to continue, or create a new one.
231/// Result of session resolution.
232struct SessionBootResult {
233    session: Session,
234    api_messages: Vec<serde_json::Value>,
235    total_input_tokens: u64,
236    total_output_tokens: u64,
237    session_cost: f64,
238    abort_context: Option<String>,
239    continued: bool,
240    continue_info: Option<ContinueInfo>,
241}
242
243fn resolve_or_create_session(
244    runtime: &mut Runtime,
245    continue_session: &Option<Option<String>>,
246) -> Result<SessionBootResult> {
247    match continue_session {
248        Some(ref maybe_id) => {
249            let session = match maybe_id {
250                Some(ref id) => resolve_session(id).map_err(|e| {
251                    crate::error::RuntimeError::Tool(format!("Failed to load session '{}': {}", id, e))
252                })?,
253                None => latest_session().map_err(|e| {
254                    crate::error::RuntimeError::Tool(format!("No sessions to continue: {}", e))
255                })?,
256            };
257            runtime.set_model(session.model.clone());
258            if let Some(ref sp) = session.system_prompt {
259                runtime.set_system_prompt(sp.clone());
260            }
261
262            let continue_info = maybe_id.as_ref().map(|q| {
263                let resolved_via = if *q != session.id {
264                    if crate::chain::load_chain(q).is_ok() {
265                        Some("chain".to_string())
266                    } else if crate::session::find_session_by_name(q).is_ok() {
267                        Some("name".to_string())
268                    } else {
269                        None
270                    }
271                } else {
272                    None
273                };
274                ContinueInfo {
275                    session_id: session.id.clone(),
276                    resolved_via,
277                    query: q.clone(),
278                }
279            });
280
281            Ok(SessionBootResult {
282                api_messages: session.api_messages.clone(),
283                total_input_tokens: session.total_input_tokens,
284                total_output_tokens: session.total_output_tokens,
285                session_cost: session.session_cost,
286                abort_context: session.abort_context.clone(),
287                continued: true,
288                continue_info,
289                session,
290            })
291        }
292        None => {
293            let session = Session::new(runtime.model(), runtime.thinking_level(), runtime.system_prompt());
294            Ok(SessionBootResult {
295                session,
296                api_messages: Vec::new(),
297                total_input_tokens: 0,
298                total_output_tokens: 0,
299                session_cost: 0.0,
300                abort_context: None,
301                continued: false,
302                continue_info: None,
303            })
304        }
305    }
306}