1use crate::{Runtime, Result, Session, latest_session, resolve_session};
7use crate::skills::registry::CommandRegistry;
8use crate::skills::keybinds::KeybindRegistry;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12pub struct EngineOpts {
14 pub continue_session: Option<Option<String>>,
15 pub system: Option<String>,
16 pub profile: Option<String>,
17 pub no_extensions: bool,
18}
19
20pub struct BackgroundTasks {
22 watcher_shutdown: Arc<std::sync::atomic::AtomicBool>,
23 watcher_task: tokio::task::JoinHandle<()>,
24 socket_shutdown: Arc<std::sync::atomic::AtomicBool>,
25 socket_task: tokio::task::JoinHandle<()>,
26 #[allow(dead_code)] session_socket_path: String,
28 session_id: String,
29 #[allow(dead_code)]
35 log_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
36}
37
38impl BackgroundTasks {
39 pub fn shutdown(&self) {
41 self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Release);
42 self.socket_shutdown.store(true, std::sync::atomic::Ordering::Release);
43 crate::events::registry::unregister_session(&self.session_id);
44 }
45}
46
47impl Drop for BackgroundTasks {
48 fn drop(&mut self) {
49 self.watcher_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
50 self.socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
51 self.watcher_task.abort();
52 self.socket_task.abort();
53 }
54}
55
56pub struct EngineBoot {
58 pub runtime: Runtime,
59 pub config: crate::SynapsConfig,
60 pub no_extensions: bool,
63 pub session: Session,
64 pub api_messages: Vec<serde_json::Value>,
65 pub total_input_tokens: u64,
66 pub total_output_tokens: u64,
67 pub session_cost: f64,
68 pub abort_context: Option<String>,
69 pub continued: bool,
70 pub continue_info: Option<ContinueInfo>,
71 pub registry: Arc<CommandRegistry>,
72 pub keybind_registry: Arc<std::sync::RwLock<KeybindRegistry>>,
77 pub mcp_server_count: usize,
78 pub system_prompt_path: std::path::PathBuf,
79 pub ext_manager: Arc<RwLock<crate::extensions::manager::ExtensionManager>>,
80 pub background: BackgroundTasks,
82}
83
84pub struct ContinueInfo {
86 pub session_id: String,
87 pub resolved_via: Option<String>, pub query: String,
89}
90
91pub async fn boot(opts: EngineOpts) -> Result<EngineBoot> {
94 if let Some(ref prof) = opts.profile {
95 crate::config::set_profile(Some(prof.clone()));
96 }
97
98 let log_guard = crate::logging::init_logging();
109 let mut runtime = Runtime::new().await?;
110
111 let config = crate::config::load_config();
113 runtime.apply_config(&config);
114
115 let system_prompt = crate::config::resolve_system_prompt(opts.system.as_deref());
117 runtime.set_system_prompt(system_prompt);
118
119 let tools_shared = runtime.tools_shared();
121 let (registry, keybind_registry) = crate::skills::register(&tools_shared, &config).await;
122
123 let mcp_server_count = crate::mcp::setup_lazy_mcp(&runtime.tools_shared()).await;
125
126 let system_prompt_path = crate::config::resolve_read_path("system.md");
127
128 let sb = resolve_or_create_session(&mut runtime, &opts.continue_session)?;
130
131 let watcher_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
133 let watcher_task = {
134 let inbox_dir = crate::config::base_dir().join("inbox");
135 let event_queue = runtime.event_queue().clone();
136 let shutdown = watcher_shutdown.clone();
137 tokio::spawn(async move {
138 crate::events::watch_inbox(inbox_dir, event_queue, shutdown).await;
139 })
140 };
141
142 let abort_tasks = |ws: &Arc<std::sync::atomic::AtomicBool>, wt: &tokio::task::JoinHandle<()>| {
144 ws.store(true, std::sync::atomic::Ordering::Relaxed);
145 wt.abort();
146 };
147
148 let socket_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
150 let session_socket_path = crate::events::registry::socket_path_for_session(&sb.session.id);
151 let socket_task = crate::events::socket::listen_session_socket(
152 session_socket_path.clone(),
153 runtime.event_queue().clone(),
154 socket_shutdown.clone(),
155 );
156 let session_registration = crate::events::registry::SessionRegistration {
157 session_id: sb.session.id.clone(),
158 name: sb.session.name.clone(),
159 socket_path: session_socket_path.clone(),
160 pid: std::process::id(),
161 started_at: chrono::Utc::now(),
162 };
163 if let Err(e) = crate::events::registry::register_session(&session_registration) {
164 abort_tasks(&watcher_shutdown, &watcher_task);
165 socket_shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
166 socket_task.abort();
167 return Err(crate::core::error::RuntimeError::Session(format!(
172 "failed to register session {}: {}",
173 session_registration.session_id, e
174 )));
175 }
176
177 let ext_mgr = crate::extensions::manager::ExtensionManager::new_with_tools(
179 Arc::clone(runtime.hook_bus()),
180 runtime.tools_shared(),
181 );
182 let ext_manager = Arc::new(RwLock::new(ext_mgr));
183 crate::runtime::openai::set_extension_manager_for_routing(Arc::clone(&ext_manager));
184
185 {
187 let mut index_record = crate::core::session_index::SessionIndexRecord::start(&sb.session.id);
188 index_record.model = Some(sb.session.model.clone());
189 index_record.profile = crate::core::config::get_profile();
190 index_record.cwd = std::env::current_dir().ok();
191 if let Err(err) = crate::core::session_index::append_record(&index_record) {
192 tracing::warn!("failed to append session start index record: {}", err);
193 }
194
195 let hook_event = crate::extensions::hooks::events::HookEvent::on_session_start(&sb.session.id);
196 let _ = runtime.hook_bus().emit(&hook_event).await;
197 }
198
199 if mcp_server_count > 0 {
200 tracing::info!("{} MCP servers available (use connect_mcp_server to activate)", mcp_server_count);
201 }
202
203 let session_id = sb.session.id.clone();
204
205 Ok(EngineBoot {
206 runtime,
207 config,
208 no_extensions: opts.no_extensions,
209 session: sb.session,
210 api_messages: sb.api_messages,
211 total_input_tokens: sb.total_input_tokens,
212 total_output_tokens: sb.total_output_tokens,
213 session_cost: sb.session_cost,
214 abort_context: sb.abort_context,
215 continued: sb.continued,
216 continue_info: sb.continue_info,
217 registry,
218 keybind_registry,
219 mcp_server_count,
220 system_prompt_path,
221 ext_manager,
222 background: BackgroundTasks {
223 watcher_shutdown,
224 watcher_task,
225 socket_shutdown,
226 socket_task,
227 session_socket_path,
228 session_id,
229 log_guard,
230 },
231 })
232}
233
234struct SessionBootResult {
237 session: Session,
238 api_messages: Vec<serde_json::Value>,
239 total_input_tokens: u64,
240 total_output_tokens: u64,
241 session_cost: f64,
242 abort_context: Option<String>,
243 continued: bool,
244 continue_info: Option<ContinueInfo>,
245}
246
247fn resolve_or_create_session(
248 runtime: &mut Runtime,
249 continue_session: &Option<Option<String>>,
250) -> Result<SessionBootResult> {
251 match continue_session {
252 Some(ref maybe_id) => {
253 let session = match maybe_id {
254 Some(ref id) => resolve_session(id).map_err(|e| {
255 crate::error::RuntimeError::Tool(format!("Failed to load session '{}': {}", id, e))
256 })?,
257 None => latest_session().map_err(|e| {
258 crate::error::RuntimeError::Tool(format!("No sessions to continue: {}", e))
259 })?,
260 };
261 runtime.set_model(session.model.clone());
262 if let Some(ref sp) = session.system_prompt {
263 runtime.set_system_prompt(sp.clone());
264 }
265
266 let continue_info = maybe_id.as_ref().map(|q| {
267 let resolved_via = if *q != session.id {
268 if crate::chain::load_chain(q).is_ok() {
269 Some("chain".to_string())
270 } else if crate::session::find_session_by_name(q).is_ok() {
271 Some("name".to_string())
272 } else {
273 None
274 }
275 } else {
276 None
277 };
278 ContinueInfo {
279 session_id: session.id.clone(),
280 resolved_via,
281 query: q.clone(),
282 }
283 });
284
285 Ok(SessionBootResult {
286 api_messages: session.api_messages.clone(),
287 total_input_tokens: session.total_input_tokens,
288 total_output_tokens: session.total_output_tokens,
289 session_cost: session.session_cost,
290 abort_context: session.abort_context.clone(),
291 continued: true,
292 continue_info,
293 session,
294 })
295 }
296 None => {
297 let session = Session::new(runtime.model(), runtime.thinking_level(), runtime.system_prompt());
298 Ok(SessionBootResult {
299 session,
300 api_messages: Vec::new(),
301 total_input_tokens: 0,
302 total_output_tokens: 0,
303 session_cost: 0.0,
304 abort_context: None,
305 continued: false,
306 continue_info: None,
307 })
308 }
309 }
310}