1use crate::{Runtime, Result, Session, latest_session, resolve_session};
7use crate::skills::registry::CommandRegistry;
8use crate::skills::keybinds::KeybindRegistry;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12pub struct EngineOpts {
14 pub continue_session: Option<Option<String>>,
15 pub system: Option<String>,
16 pub profile: Option<String>,
17 pub no_extensions: bool,
18}
19
20pub struct BackgroundTasks {
22 watcher_shutdown: Arc<std::sync::atomic::AtomicBool>,
23 watcher_task: tokio::task::JoinHandle<()>,
24 socket_shutdown: Arc<std::sync::atomic::AtomicBool>,
25 socket_task: tokio::task::JoinHandle<()>,
26 #[allow(dead_code)] session_socket_path: String,
28 session_id: String,
29 #[allow(dead_code)]
35 log_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
36}
37
38impl BackgroundTasks {
39 pub fn shutdown(&self) {
41 self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Release);
42 self.socket_shutdown.store(true, std::sync::atomic::Ordering::Release);
43 crate::events::registry::unregister_session(&self.session_id);
44 }
45}
46
47impl Drop for BackgroundTasks {
48 fn drop(&mut self) {
49 self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
50 self.socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
51 self.watcher_task.abort();
52 self.socket_task.abort();
53 }
54}
55
56pub struct EngineBoot {
58 pub runtime: Runtime,
59 pub config: crate::SynapsConfig,
60 pub session: Session,
61 pub api_messages: Vec<serde_json::Value>,
62 pub total_input_tokens: u64,
63 pub total_output_tokens: u64,
64 pub session_cost: f64,
65 pub abort_context: Option<String>,
66 pub continued: bool,
67 pub continue_info: Option<ContinueInfo>,
68 pub registry: Arc<CommandRegistry>,
69 pub keybind_registry: Arc<std::sync::RwLock<KeybindRegistry>>,
74 pub mcp_server_count: usize,
75 pub system_prompt_path: std::path::PathBuf,
76 pub ext_manager: Arc<RwLock<crate::extensions::manager::ExtensionManager>>,
77 pub background: BackgroundTasks,
79}
80
81pub struct ContinueInfo {
83 pub session_id: String,
84 pub resolved_via: Option<String>, pub query: String,
86}
87
88pub async fn boot(opts: EngineOpts) -> Result<EngineBoot> {
91 if let Some(ref prof) = opts.profile {
92 crate::config::set_profile(Some(prof.clone()));
93 }
94
95 let log_guard = crate::logging::init_logging();
106 let mut runtime = Runtime::new().await?;
107
108 let config = crate::config::load_config();
110 runtime.apply_config(&config);
111
112 let system_prompt = crate::config::resolve_system_prompt(opts.system.as_deref());
114 runtime.set_system_prompt(system_prompt);
115
116 let tools_shared = runtime.tools_shared();
118 let (registry, keybind_registry) = crate::skills::register(&tools_shared, &config).await;
119
120 let mcp_server_count = crate::mcp::setup_lazy_mcp(&runtime.tools_shared()).await;
122
123 let system_prompt_path = crate::config::resolve_read_path("system.md");
124
125 let sb = resolve_or_create_session(&mut runtime, &opts.continue_session)?;
127
128 let watcher_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
130 let watcher_task = {
131 let inbox_dir = crate::config::base_dir().join("inbox");
132 let event_queue = runtime.event_queue().clone();
133 let shutdown = watcher_shutdown.clone();
134 tokio::spawn(async move {
135 crate::events::watch_inbox(inbox_dir, event_queue, shutdown).await;
136 })
137 };
138
139 let abort_tasks = |ws: &Arc<std::sync::atomic::AtomicBool>, wt: &tokio::task::JoinHandle<()>| {
141 ws.store(true, std::sync::atomic::Ordering::Relaxed);
142 wt.abort();
143 };
144
145 let socket_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
147 let session_socket_path = crate::events::registry::socket_path_for_session(&sb.session.id);
148 let socket_task = crate::events::socket::listen_session_socket(
149 session_socket_path.clone(),
150 runtime.event_queue().clone(),
151 socket_shutdown.clone(),
152 );
153 let session_registration = crate::events::registry::SessionRegistration {
154 session_id: sb.session.id.clone(),
155 name: sb.session.name.clone(),
156 socket_path: session_socket_path.clone(),
157 pid: std::process::id(),
158 started_at: chrono::Utc::now(),
159 };
160 if let Err(e) = crate::events::registry::register_session(&session_registration) {
161 abort_tasks(&watcher_shutdown, &watcher_task);
162 socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
163 socket_task.abort();
164 return Err(crate::core::error::RuntimeError::Session(format!(
169 "failed to register session {}: {}",
170 session_registration.session_id, e
171 )));
172 }
173
174 let ext_mgr = crate::extensions::manager::ExtensionManager::new_with_tools(
176 Arc::clone(runtime.hook_bus()),
177 runtime.tools_shared(),
178 );
179 let ext_manager = Arc::new(RwLock::new(ext_mgr));
180 crate::runtime::openai::set_extension_manager_for_routing(Arc::clone(&ext_manager));
181
182 {
184 let mut index_record = crate::core::session_index::SessionIndexRecord::start(&sb.session.id);
185 index_record.model = Some(sb.session.model.clone());
186 index_record.profile = crate::core::config::get_profile();
187 index_record.cwd = std::env::current_dir().ok();
188 if let Err(err) = crate::core::session_index::append_record(&index_record) {
189 tracing::warn!("failed to append session start index record: {}", err);
190 }
191
192 let hook_event = crate::extensions::hooks::events::HookEvent::on_session_start(&sb.session.id);
193 let _ = runtime.hook_bus().emit(&hook_event).await;
194 }
195
196 if mcp_server_count > 0 {
197 tracing::info!("{} MCP servers available (use connect_mcp_server to activate)", mcp_server_count);
198 }
199
200 let session_id = sb.session.id.clone();
201
202 Ok(EngineBoot {
203 runtime,
204 config,
205 session: sb.session,
206 api_messages: sb.api_messages,
207 total_input_tokens: sb.total_input_tokens,
208 total_output_tokens: sb.total_output_tokens,
209 session_cost: sb.session_cost,
210 abort_context: sb.abort_context,
211 continued: sb.continued,
212 continue_info: sb.continue_info,
213 registry,
214 keybind_registry,
215 mcp_server_count,
216 system_prompt_path,
217 ext_manager,
218 background: BackgroundTasks {
219 watcher_shutdown,
220 watcher_task,
221 socket_shutdown,
222 socket_task,
223 session_socket_path,
224 session_id,
225 log_guard,
226 },
227 })
228}
229
230struct SessionBootResult {
233 session: Session,
234 api_messages: Vec<serde_json::Value>,
235 total_input_tokens: u64,
236 total_output_tokens: u64,
237 session_cost: f64,
238 abort_context: Option<String>,
239 continued: bool,
240 continue_info: Option<ContinueInfo>,
241}
242
243fn resolve_or_create_session(
244 runtime: &mut Runtime,
245 continue_session: &Option<Option<String>>,
246) -> Result<SessionBootResult> {
247 match continue_session {
248 Some(ref maybe_id) => {
249 let session = match maybe_id {
250 Some(ref id) => resolve_session(id).map_err(|e| {
251 crate::error::RuntimeError::Tool(format!("Failed to load session '{}': {}", id, e))
252 })?,
253 None => latest_session().map_err(|e| {
254 crate::error::RuntimeError::Tool(format!("No sessions to continue: {}", e))
255 })?,
256 };
257 runtime.set_model(session.model.clone());
258 if let Some(ref sp) = session.system_prompt {
259 runtime.set_system_prompt(sp.clone());
260 }
261
262 let continue_info = maybe_id.as_ref().map(|q| {
263 let resolved_via = if *q != session.id {
264 if crate::chain::load_chain(q).is_ok() {
265 Some("chain".to_string())
266 } else if crate::session::find_session_by_name(q).is_ok() {
267 Some("name".to_string())
268 } else {
269 None
270 }
271 } else {
272 None
273 };
274 ContinueInfo {
275 session_id: session.id.clone(),
276 resolved_via,
277 query: q.clone(),
278 }
279 });
280
281 Ok(SessionBootResult {
282 api_messages: session.api_messages.clone(),
283 total_input_tokens: session.total_input_tokens,
284 total_output_tokens: session.total_output_tokens,
285 session_cost: session.session_cost,
286 abort_context: session.abort_context.clone(),
287 continued: true,
288 continue_info,
289 session,
290 })
291 }
292 None => {
293 let session = Session::new(runtime.model(), runtime.thinking_level(), runtime.system_prompt());
294 Ok(SessionBootResult {
295 session,
296 api_messages: Vec::new(),
297 total_input_tokens: 0,
298 total_output_tokens: 0,
299 session_cost: 0.0,
300 abort_context: None,
301 continued: false,
302 continue_info: None,
303 })
304 }
305 }
306}