Skip to main content

agent_engine/engine/
setup.rs

1//! Engine setup — boot sequence shared by TUI and headless modes.
2//!
3//! Extracts the initialization logic that was previously inlined in
4//! chatui/mod.rs so both renderers can use the same boot path.
5
6use crate::{Runtime, Result, Session, latest_session, resolve_session};
7use crate::skills::registry::CommandRegistry;
8use crate::skills::keybinds::KeybindRegistry;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// Options for engine boot.
13pub struct EngineOpts {
14    pub continue_session: Option<Option<String>>,
15    pub system: Option<String>,
16    pub profile: Option<String>,
17    pub no_extensions: bool,
18}
19
20/// Background tasks spawned during boot. Aborts on drop.
21pub struct BackgroundTasks {
22    watcher_shutdown: Arc<std::sync::atomic::AtomicBool>,
23    watcher_task: tokio::task::JoinHandle<()>,
24    socket_shutdown: Arc<std::sync::atomic::AtomicBool>,
25    socket_task: tokio::task::JoinHandle<()>,
26    #[allow(dead_code)] // stored for potential future use (e.g. reconnect)
27    session_socket_path: String,
28    session_id: String,
29    /// File-appender flush guard. Holding this for the lifetime of the
30    /// renderer keeps the non-blocking log writer's background thread
31    /// alive — without it, log lines emitted after `boot()` returns can
32    /// be silently dropped before they reach disk. Dropped last when
33    /// BackgroundTasks drops.
34    #[allow(dead_code)]
35    log_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
36}
37
38impl BackgroundTasks {
39    /// Signal all tasks to stop and unregister the session.
40    pub fn shutdown(&self) {
41        self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Release);
42        self.socket_shutdown.store(true, std::sync::atomic::Ordering::Release);
43        crate::events::registry::unregister_session(&self.session_id);
44    }
45}
46
47impl Drop for BackgroundTasks {
48    fn drop(&mut self) {
49        self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
50        self.socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
51        self.watcher_task.abort();
52        self.socket_task.abort();
53    }
54}
55
56/// Result of the boot sequence — everything a renderer needs to start.
57pub struct EngineBoot {
58    pub runtime: Runtime,
59    pub config: crate::SynapsConfig,
60    /// Echo of EngineOpts.no_extensions — callers gate extension discovery
61    /// on this so the flag has one source of truth.
62    pub no_extensions: bool,
63    pub session: Session,
64    pub api_messages: Vec<serde_json::Value>,
65    pub total_input_tokens: u64,
66    pub total_output_tokens: u64,
67    pub session_cost: f64,
68    pub abort_context: Option<String>,
69    pub continued: bool,
70    pub continue_info: Option<ContinueInfo>,
71    pub registry: Arc<CommandRegistry>,
72    /// Keybind registry. Uses std::sync::RwLock (not tokio) because keybind
73    /// lookups are synchronous, fast, and called from input handling code
74    /// that cannot await. This is safe as long as the lock is never held
75    /// across an await point.
76    pub keybind_registry: Arc<std::sync::RwLock<KeybindRegistry>>,
77    pub mcp_server_count: usize,
78    pub system_prompt_path: std::path::PathBuf,
79    pub ext_manager: Arc<RwLock<crate::extensions::manager::ExtensionManager>>,
80    /// Background tasks — inbox watcher, socket listener. Aborts on drop.
81    pub background: BackgroundTasks,
82}
83
84/// Info about how a continued session was resolved.
85pub struct ContinueInfo {
86    pub session_id: String,
87    pub resolved_via: Option<String>, // "chain", "name", or None
88    pub query: String,
89}
90
91/// Run the full engine boot sequence:
92/// config → system prompt → skills → MCP → session → sockets → extensions
93pub async fn boot(opts: EngineOpts) -> Result<EngineBoot> {
94    if let Some(ref prof) = opts.profile {
95        crate::config::set_profile(Some(prof.clone()));
96    }
97
98    // Capture the WorkerGuard from the file appender. tracing-appender's
99    // non-blocking writer uses a background flush thread; the guard is
100    // an RAII handle that stops that thread on drop. The previous code
101    // dropped it at the end of boot() with a comment claiming "this is
102    // fine because tracing-subscriber uses a global subscriber" — which
103    // is true for the subscriber, but NOT for the file appender's
104    // background thread. With the guard dropped, log lines emitted after
105    // boot() returned (Extension loaded, hook traces, etc.) could be
106    // silently lost. We hand the guard down through EngineBoot so the
107    // renderer (TUI / chat / server) keeps it alive for its lifetime.
108    let log_guard = crate::logging::init_logging();
109    let mut runtime = Runtime::new().await?;
110
111    // Load config and apply
112    let config = crate::config::load_config();
113    runtime.apply_config(&config);
114
115    // Load system prompt
116    let system_prompt = crate::config::resolve_system_prompt(opts.system.as_deref());
117    runtime.set_system_prompt(system_prompt);
118
119    // Discover plugins/skills, build command registry, register load_skill tool.
120    let tools_shared = runtime.tools_shared();
121    let (registry, keybind_registry) = crate::skills::register(&tools_shared, &config).await;
122
123    // Set up lazy MCP loading (if configured in ~/.synaps-cli/mcp.json)
124    let mcp_server_count = crate::mcp::setup_lazy_mcp(&runtime.tools_shared()).await;
125
126    let system_prompt_path = crate::config::resolve_read_path("system.md");
127
128    // Session: continue existing or create new
129    let sb = resolve_or_create_session(&mut runtime, &opts.continue_session)?;
130
131    // Start inbox watcher
132    let watcher_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
133    let watcher_task = {
134        let inbox_dir = crate::config::base_dir().join("inbox");
135        let event_queue = runtime.event_queue().clone();
136        let shutdown = watcher_shutdown.clone();
137        tokio::spawn(async move {
138            crate::events::watch_inbox(inbox_dir, event_queue, shutdown).await;
139        })
140    };
141
142    // Helper: abort background tasks on error
143    let abort_tasks = |ws: &Arc<std::sync::atomic::AtomicBool>, wt: &tokio::task::JoinHandle<()>| {
144        ws.store(true, std::sync::atomic::Ordering::Relaxed);
145        wt.abort();
146    };
147
148    // Start per-session Unix socket listener + register in session registry
149    let socket_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
150    let session_socket_path = crate::events::registry::socket_path_for_session(&sb.session.id);
151    let socket_task = crate::events::socket::listen_session_socket(
152        session_socket_path.clone(),
153        runtime.event_queue().clone(),
154        socket_shutdown.clone(),
155    );
156    let session_registration = crate::events::registry::SessionRegistration {
157        session_id: sb.session.id.clone(),
158        name: sb.session.name.clone(),
159        socket_path: session_socket_path.clone(),
160        pid: std::process::id(),
161        started_at: chrono::Utc::now(),
162    };
163    if let Err(e) = crate::events::registry::register_session(&session_registration) {
164        abort_tasks(&watcher_shutdown, &watcher_task);
165        socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
166        socket_task.abort();
167        // Fail loudly: returning Ok with already-aborted handles silently
168        // poisoned downstream — server inherited dead watcher/socket tasks
169        // and a session that wasn't in the registry, so other tools couldn't
170        // see it. Better to fail boot than start in a broken state.
171        return Err(crate::core::error::RuntimeError::Session(format!(
172            "failed to register session {}: {}",
173            session_registration.session_id, e
174        )));
175    }
176
177    // Extension manager
178    let ext_mgr = crate::extensions::manager::ExtensionManager::new_with_tools(
179        Arc::clone(runtime.hook_bus()),
180        runtime.tools_shared(),
181    );
182    let ext_manager = Arc::new(RwLock::new(ext_mgr));
183    crate::runtime::openai::set_extension_manager_for_routing(Arc::clone(&ext_manager));
184
185    // Session start hook
186    {
187        let mut index_record = crate::core::session_index::SessionIndexRecord::start(&sb.session.id);
188        index_record.model = Some(sb.session.model.clone());
189        index_record.profile = crate::core::config::get_profile();
190        index_record.cwd = std::env::current_dir().ok();
191        if let Err(err) = crate::core::session_index::append_record(&index_record) {
192            tracing::warn!("failed to append session start index record: {}", err);
193        }
194
195        let hook_event = crate::extensions::hooks::events::HookEvent::on_session_start(&sb.session.id);
196        let _ = runtime.hook_bus().emit(&hook_event).await;
197    }
198
199    if mcp_server_count > 0 {
200        tracing::info!("{} MCP servers available (use connect_mcp_server to activate)", mcp_server_count);
201    }
202
203    let session_id = sb.session.id.clone();
204
205    Ok(EngineBoot {
206        runtime,
207        config,
208        no_extensions: opts.no_extensions,
209        session: sb.session,
210        api_messages: sb.api_messages,
211        total_input_tokens: sb.total_input_tokens,
212        total_output_tokens: sb.total_output_tokens,
213        session_cost: sb.session_cost,
214        abort_context: sb.abort_context,
215        continued: sb.continued,
216        continue_info: sb.continue_info,
217        registry,
218        keybind_registry,
219        mcp_server_count,
220        system_prompt_path,
221        ext_manager,
222        background: BackgroundTasks {
223            watcher_shutdown,
224            watcher_task,
225            socket_shutdown,
226            socket_task,
227            session_socket_path,
228            session_id,
229            log_guard,
230        },
231    })
232}
233
234/// Resolve a session to continue, or create a new one.
235/// Result of session resolution.
236struct SessionBootResult {
237    session: Session,
238    api_messages: Vec<serde_json::Value>,
239    total_input_tokens: u64,
240    total_output_tokens: u64,
241    session_cost: f64,
242    abort_context: Option<String>,
243    continued: bool,
244    continue_info: Option<ContinueInfo>,
245}
246
247fn resolve_or_create_session(
248    runtime: &mut Runtime,
249    continue_session: &Option<Option<String>>,
250) -> Result<SessionBootResult> {
251    match continue_session {
252        Some(ref maybe_id) => {
253            let session = match maybe_id {
254                Some(ref id) => resolve_session(id).map_err(|e| {
255                    crate::error::RuntimeError::Tool(format!("Failed to load session '{}': {}", id, e))
256                })?,
257                None => latest_session().map_err(|e| {
258                    crate::error::RuntimeError::Tool(format!("No sessions to continue: {}", e))
259                })?,
260            };
261            runtime.set_model(session.model.clone());
262            if let Some(ref sp) = session.system_prompt {
263                runtime.set_system_prompt(sp.clone());
264            }
265
266            let continue_info = maybe_id.as_ref().map(|q| {
267                let resolved_via = if *q != session.id {
268                    if crate::chain::load_chain(q).is_ok() {
269                        Some("chain".to_string())
270                    } else if crate::session::find_session_by_name(q).is_ok() {
271                        Some("name".to_string())
272                    } else {
273                        None
274                    }
275                } else {
276                    None
277                };
278                ContinueInfo {
279                    session_id: session.id.clone(),
280                    resolved_via,
281                    query: q.clone(),
282                }
283            });
284
285            Ok(SessionBootResult {
286                api_messages: session.api_messages.clone(),
287                total_input_tokens: session.total_input_tokens,
288                total_output_tokens: session.total_output_tokens,
289                session_cost: session.session_cost,
290                abort_context: session.abort_context.clone(),
291                continued: true,
292                continue_info,
293                session,
294            })
295        }
296        None => {
297            let session = Session::new(runtime.model(), runtime.thinking_level(), runtime.system_prompt());
298            Ok(SessionBootResult {
299                session,
300                api_messages: Vec::new(),
301                total_input_tokens: 0,
302                total_output_tokens: 0,
303                session_cost: 0.0,
304                abort_context: None,
305                continued: false,
306                continue_info: None,
307            })
308        }
309    }
310}