Skip to main content

steer_core/app/domain/runtime/
supervisor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use thiserror::Error;
5use tokio::sync::{broadcast, mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::api::Client as ApiClient;
9use crate::app::conversation::UserContent;
10use crate::app::domain::action::Action;
11use crate::app::domain::delta::StreamDelta;
12use crate::app::domain::event::SessionEvent;
13use crate::app::domain::reduce::apply_event_to_state;
14use crate::app::domain::session::EventStore;
15use crate::app::domain::state::AppState;
16use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId};
17
18use crate::config::model::ModelId;
19use crate::primary_agents::{default_primary_agent_id, resolve_effective_config};
20use crate::prompts::system_prompt_for_model;
21use crate::session::state::SessionConfig;
22use crate::tools::ToolExecutor;
23use tracing::warn;
24
25use super::session_actor::{SessionActorHandle, SessionError, spawn_session_actor};
26use super::subscription::SessionEventSubscription;
27
28#[derive(Debug, Error)]
29pub enum RuntimeError {
30    #[error("Session not found: {session_id}")]
31    SessionNotFound { session_id: String },
32
33    #[error("Session already exists: {session_id}")]
34    SessionAlreadyExists { session_id: String },
35
36    #[error("Session error: {0}")]
37    Session(SessionError),
38
39    #[error("Event store error: {0}")]
40    EventStore(#[from] crate::app::domain::session::EventStoreError),
41
42    #[error("Channel closed")]
43    ChannelClosed,
44
45    #[error("Invalid input: {message}")]
46    InvalidInput { message: String },
47
48    #[error("Supervisor shutting down")]
49    ShuttingDown,
50}
51
52impl From<SessionError> for RuntimeError {
53    fn from(error: SessionError) -> Self {
54        match error {
55            SessionError::InvalidInput { message, .. } => RuntimeError::InvalidInput { message },
56            other => RuntimeError::Session(other),
57        }
58    }
59}
60
61pub(crate) enum SupervisorCmd {
62    CreateSession {
63        config: Box<SessionConfig>,
64        reply: oneshot::Sender<Result<SessionId, RuntimeError>>,
65    },
66    ResumeSession {
67        session_id: SessionId,
68        reply: oneshot::Sender<Result<(), RuntimeError>>,
69    },
70    SuspendSession {
71        session_id: SessionId,
72        reply: oneshot::Sender<Result<(), RuntimeError>>,
73    },
74    DeleteSession {
75        session_id: SessionId,
76        reply: oneshot::Sender<Result<(), RuntimeError>>,
77    },
78    DispatchAction {
79        session_id: SessionId,
80        action: Box<Action>,
81        reply: oneshot::Sender<Result<(), RuntimeError>>,
82    },
83    SubscribeEvents {
84        session_id: SessionId,
85        reply: oneshot::Sender<Result<SessionEventSubscription, RuntimeError>>,
86    },
87    SubscribeDeltas {
88        session_id: SessionId,
89        reply: oneshot::Sender<Result<broadcast::Receiver<StreamDelta>, RuntimeError>>,
90    },
91    LoadEventsAfter {
92        session_id: SessionId,
93        after_seq: u64,
94        reply: oneshot::Sender<Result<Vec<(u64, SessionEvent)>, RuntimeError>>,
95    },
96    GetSessionState {
97        session_id: SessionId,
98        reply: oneshot::Sender<Result<AppState, RuntimeError>>,
99    },
100    IsSessionActive {
101        session_id: SessionId,
102        reply: oneshot::Sender<bool>,
103    },
104    ListActiveSessions {
105        reply: oneshot::Sender<Vec<SessionId>>,
106    },
107    ListAllSessions {
108        reply: oneshot::Sender<Result<Vec<SessionId>, RuntimeError>>,
109    },
110    SessionExists {
111        session_id: SessionId,
112        reply: oneshot::Sender<Result<bool, RuntimeError>>,
113    },
114    Shutdown,
115}
116
117struct RuntimeSupervisor {
118    sessions: HashMap<SessionId, SessionActorHandle>,
119    event_store: Arc<dyn EventStore>,
120    api_client: Arc<ApiClient>,
121    tool_executor: Arc<ToolExecutor>,
122}
123
124impl RuntimeSupervisor {
125    fn new(
126        event_store: Arc<dyn EventStore>,
127        api_client: Arc<ApiClient>,
128        tool_executor: Arc<ToolExecutor>,
129    ) -> Self {
130        Self {
131            sessions: HashMap::new(),
132            event_store,
133            api_client,
134            tool_executor,
135        }
136    }
137
138    async fn run(mut self, mut cmd_rx: mpsc::Receiver<SupervisorCmd>) {
139        loop {
140            tokio::select! {
141                Some(cmd) = cmd_rx.recv() => {
142                    match cmd {
143                        SupervisorCmd::CreateSession { config, reply } => {
144                            let result = self.create_session(*config).await;
145                            let _ = reply.send(result);
146                        }
147                        SupervisorCmd::ResumeSession { session_id, reply } => {
148                            let result = self.resume_session(session_id).await;
149                            let _ = reply.send(result);
150                        }
151                        SupervisorCmd::SuspendSession { session_id, reply } => {
152                            let result = self.suspend_session(session_id).await;
153                            let _ = reply.send(result);
154                        }
155                        SupervisorCmd::DeleteSession { session_id, reply } => {
156                            let result = self.delete_session(session_id).await;
157                            let _ = reply.send(result);
158                        }
159                        SupervisorCmd::DispatchAction { session_id, action, reply } => {
160                            let result = self.dispatch_action(session_id, *action).await;
161                            let _ = reply.send(result);
162                        }
163                        SupervisorCmd::SubscribeEvents { session_id, reply } => {
164                            let result = self.subscribe_events(session_id).await;
165                            let _ = reply.send(result);
166                        }
167                        SupervisorCmd::SubscribeDeltas { session_id, reply } => {
168                            let result = self.subscribe_deltas(session_id).await;
169                            let _ = reply.send(result);
170                        }
171                        SupervisorCmd::LoadEventsAfter {
172                            session_id,
173                            after_seq,
174                            reply,
175                        } => {
176                            let result = self
177                                .event_store
178                                .load_events_after(session_id, after_seq)
179                                .await
180                                .map_err(RuntimeError::from);
181                            let _ = reply.send(result);
182                        }
183                        SupervisorCmd::GetSessionState { session_id, reply } => {
184                            let result = self.get_session_state(session_id).await;
185                            let _ = reply.send(result);
186                        }
187                        SupervisorCmd::IsSessionActive { session_id, reply } => {
188                            let is_active = self.sessions.contains_key(&session_id);
189                            let _ = reply.send(is_active);
190                        }
191                        SupervisorCmd::ListActiveSessions { reply } => {
192                            let sessions: Vec<SessionId> = self.sessions.keys().copied().collect();
193                            let _ = reply.send(sessions);
194                        }
195                        SupervisorCmd::ListAllSessions { reply } => {
196                            let result = self.event_store.list_session_ids().await
197                                .map_err(RuntimeError::from);
198                            let _ = reply.send(result);
199                        }
200                        SupervisorCmd::SessionExists { session_id, reply } => {
201                            let result = self.event_store.session_exists(session_id).await
202                                .map_err(RuntimeError::from);
203                            let _ = reply.send(result);
204                        }
205                        SupervisorCmd::Shutdown => {
206                            self.shutdown_all().await;
207                            break;
208                        }
209                    }
210                }
211                else => break,
212            }
213        }
214
215        tracing::info!("Runtime supervisor stopped");
216    }
217
218    async fn create_session(&mut self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
219        let session_id = SessionId::new();
220
221        self.event_store.create_session(session_id).await?;
222
223        let mut config = config;
224
225        if config.primary_agent_id.is_none() {
226            config.primary_agent_id = Some(default_primary_agent_id().to_string());
227        }
228        let mut config = resolve_effective_config(&config);
229
230        let system_context = self.resolve_system_context(&config).await;
231        if let Some(context) = &system_context {
232            config.system_prompt = Some(context.prompt.clone());
233        }
234
235        let session_created_event = SessionEvent::SessionCreated {
236            config: Box::new(config.clone()),
237            metadata: config.metadata.clone(),
238            parent_session_id: None,
239        };
240        self.event_store
241            .append(session_id, &session_created_event)
242            .await?;
243
244        let mut state = AppState::new(session_id);
245        state.apply_session_config(&config, config.primary_agent_id.clone(), true);
246        if let Some(system_context) = system_context {
247            state.cached_system_context = Some(system_context);
248        }
249
250        let handle = spawn_session_actor(
251            session_id,
252            state,
253            self.event_store.clone(),
254            self.api_client.clone(),
255            self.tool_executor.clone(),
256        );
257        self.sessions.insert(session_id, handle);
258
259        tracing::info!(session_id = %session_id, "Created session");
260
261        Ok(session_id)
262    }
263
264    async fn resume_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
265        if self.sessions.contains_key(&session_id) {
266            return Ok(());
267        }
268
269        if !self.event_store.session_exists(session_id).await? {
270            return Err(RuntimeError::SessionNotFound {
271                session_id: session_id.to_string(),
272            });
273        }
274
275        let events = self.event_store.load_events(session_id).await?;
276
277        let mut state = AppState::new(session_id);
278        for (_, event) in &events {
279            apply_event_to_state(&mut state, event);
280        }
281
282        if let Some(config) = state.session_config.clone() {
283            let mut resolved = resolve_effective_config(&config);
284            let system_context = self.resolve_system_context(&resolved).await;
285            if let Some(context) = &system_context {
286                resolved.system_prompt = Some(context.prompt.clone());
287            }
288            state.apply_session_config(&resolved, resolved.primary_agent_id.clone(), false);
289            state.cached_system_context = system_context;
290        }
291
292        let should_drain_queue = !state.has_active_operation() && !state.queued_work.is_empty();
293        let handle = spawn_session_actor(
294            session_id,
295            state,
296            self.event_store.clone(),
297            self.api_client.clone(),
298            self.tool_executor.clone(),
299        );
300
301        if should_drain_queue {
302            handle
303                .dispatch(Action::DrainQueuedWork { session_id })
304                .await?;
305        }
306
307        self.sessions.insert(session_id, handle);
308
309        tracing::info!(
310            session_id = %session_id,
311            event_count = events.len(),
312            "Resumed session"
313        );
314
315        Ok(())
316    }
317
318    async fn resolve_system_context(
319        &self,
320        config: &SessionConfig,
321    ) -> Option<crate::app::SystemContext> {
322        let prompt = config
323            .system_prompt
324            .as_ref()
325            .and_then(|prompt| {
326                if prompt.trim().is_empty() {
327                    None
328                } else {
329                    Some(prompt.clone())
330                }
331            })
332            .unwrap_or_else(|| system_prompt_for_model(&config.default_model));
333
334        let workspace = match self.tool_executor.workspace() {
335            Some(workspace) => workspace,
336            None => return Some(crate::app::SystemContext::new(prompt)),
337        };
338
339        let environment = match workspace.environment().await {
340            Ok(env_info) => Some(env_info),
341            Err(error) => {
342                warn!(error = %error, "Failed to collect environment info for system context");
343                None
344            }
345        };
346
347        Some(crate::app::SystemContext::with_environment(
348            prompt,
349            environment,
350        ))
351    }
352
353    async fn suspend_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
354        if let Some(handle) = self.sessions.remove(&session_id) {
355            let _ = handle.suspend().await;
356            tracing::info!(session_id = %session_id, "Suspended session");
357        }
358        Ok(())
359    }
360
361    async fn delete_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
362        if let Some(handle) = self.sessions.remove(&session_id) {
363            handle.shutdown();
364        }
365
366        self.event_store.delete_session(session_id).await?;
367
368        tracing::info!(session_id = %session_id, "Deleted session");
369
370        Ok(())
371    }
372
373    async fn dispatch_action(
374        &mut self,
375        session_id: SessionId,
376        action: Action,
377    ) -> Result<(), RuntimeError> {
378        if !self.sessions.contains_key(&session_id) {
379            self.resume_session(session_id).await?;
380        }
381
382        let handle =
383            self.sessions
384                .get(&session_id)
385                .ok_or_else(|| RuntimeError::SessionNotFound {
386                    session_id: session_id.to_string(),
387                })?;
388
389        handle.dispatch(action).await?;
390
391        Ok(())
392    }
393
394    async fn subscribe_events(
395        &mut self,
396        session_id: SessionId,
397    ) -> Result<SessionEventSubscription, RuntimeError> {
398        if !self.sessions.contains_key(&session_id) {
399            self.resume_session(session_id).await?;
400        }
401
402        let handle =
403            self.sessions
404                .get(&session_id)
405                .ok_or_else(|| RuntimeError::SessionNotFound {
406                    session_id: session_id.to_string(),
407                })?;
408
409        let subscription = handle.subscribe().await?;
410
411        Ok(subscription)
412    }
413
414    async fn subscribe_deltas(
415        &mut self,
416        session_id: SessionId,
417    ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
418        if !self.sessions.contains_key(&session_id) {
419            self.resume_session(session_id).await?;
420        }
421
422        let handle =
423            self.sessions
424                .get(&session_id)
425                .ok_or_else(|| RuntimeError::SessionNotFound {
426                    session_id: session_id.to_string(),
427                })?;
428
429        let delta_rx = handle.subscribe_deltas().await?;
430
431        Ok(delta_rx)
432    }
433
434    async fn get_session_state(&mut self, session_id: SessionId) -> Result<AppState, RuntimeError> {
435        if !self.sessions.contains_key(&session_id) {
436            self.resume_session(session_id).await?;
437        }
438
439        let handle =
440            self.sessions
441                .get(&session_id)
442                .ok_or_else(|| RuntimeError::SessionNotFound {
443                    session_id: session_id.to_string(),
444                })?;
445
446        let state = handle.get_state().await?;
447
448        Ok(state)
449    }
450
451    async fn shutdown_all(&mut self) {
452        for (session_id, handle) in self.sessions.drain() {
453            handle.shutdown();
454            tracing::debug!(session_id = %session_id, "Shutting down session");
455        }
456    }
457}
458
459#[derive(Clone)]
460pub struct RuntimeHandle {
461    tx: mpsc::Sender<SupervisorCmd>,
462}
463
464impl RuntimeHandle {
465    pub async fn create_session(&self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
466        let (reply_tx, reply_rx) = oneshot::channel();
467        self.tx
468            .send(SupervisorCmd::CreateSession {
469                config: Box::new(config),
470                reply: reply_tx,
471            })
472            .await
473            .map_err(|_| RuntimeError::ChannelClosed)?;
474        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
475    }
476
477    pub async fn resume_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
478        let (reply_tx, reply_rx) = oneshot::channel();
479        self.tx
480            .send(SupervisorCmd::ResumeSession {
481                session_id,
482                reply: reply_tx,
483            })
484            .await
485            .map_err(|_| RuntimeError::ChannelClosed)?;
486        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
487    }
488
489    pub async fn suspend_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
490        let (reply_tx, reply_rx) = oneshot::channel();
491        self.tx
492            .send(SupervisorCmd::SuspendSession {
493                session_id,
494                reply: reply_tx,
495            })
496            .await
497            .map_err(|_| RuntimeError::ChannelClosed)?;
498        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
499    }
500
501    pub async fn delete_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
502        let (reply_tx, reply_rx) = oneshot::channel();
503        self.tx
504            .send(SupervisorCmd::DeleteSession {
505                session_id,
506                reply: reply_tx,
507            })
508            .await
509            .map_err(|_| RuntimeError::ChannelClosed)?;
510        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
511    }
512
513    pub async fn dispatch_action(
514        &self,
515        session_id: SessionId,
516        action: Action,
517    ) -> Result<(), RuntimeError> {
518        let (reply_tx, reply_rx) = oneshot::channel();
519        self.tx
520            .send(SupervisorCmd::DispatchAction {
521                session_id,
522                action: Box::new(action),
523                reply: reply_tx,
524            })
525            .await
526            .map_err(|_| RuntimeError::ChannelClosed)?;
527        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
528    }
529
530    pub async fn subscribe_events(
531        &self,
532        session_id: SessionId,
533    ) -> Result<SessionEventSubscription, RuntimeError> {
534        let (reply_tx, reply_rx) = oneshot::channel();
535        self.tx
536            .send(SupervisorCmd::SubscribeEvents {
537                session_id,
538                reply: reply_tx,
539            })
540            .await
541            .map_err(|_| RuntimeError::ChannelClosed)?;
542        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
543    }
544
545    pub async fn subscribe_deltas(
546        &self,
547        session_id: SessionId,
548    ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
549        let (reply_tx, reply_rx) = oneshot::channel();
550        self.tx
551            .send(SupervisorCmd::SubscribeDeltas {
552                session_id,
553                reply: reply_tx,
554            })
555            .await
556            .map_err(|_| RuntimeError::ChannelClosed)?;
557        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
558    }
559
560    pub async fn load_events_after(
561        &self,
562        session_id: SessionId,
563        after_seq: u64,
564    ) -> Result<Vec<(u64, SessionEvent)>, RuntimeError> {
565        let (reply_tx, reply_rx) = oneshot::channel();
566        self.tx
567            .send(SupervisorCmd::LoadEventsAfter {
568                session_id,
569                after_seq,
570                reply: reply_tx,
571            })
572            .await
573            .map_err(|_| RuntimeError::ChannelClosed)?;
574        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
575    }
576
577    pub async fn get_session_state(&self, session_id: SessionId) -> Result<AppState, RuntimeError> {
578        let (reply_tx, reply_rx) = oneshot::channel();
579        self.tx
580            .send(SupervisorCmd::GetSessionState {
581                session_id,
582                reply: reply_tx,
583            })
584            .await
585            .map_err(|_| RuntimeError::ChannelClosed)?;
586        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
587    }
588
589    pub async fn is_session_active(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
590        let (reply_tx, reply_rx) = oneshot::channel();
591        self.tx
592            .send(SupervisorCmd::IsSessionActive {
593                session_id,
594                reply: reply_tx,
595            })
596            .await
597            .map_err(|_| RuntimeError::ChannelClosed)?;
598        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
599    }
600
601    pub async fn list_active_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
602        let (reply_tx, reply_rx) = oneshot::channel();
603        self.tx
604            .send(SupervisorCmd::ListActiveSessions { reply: reply_tx })
605            .await
606            .map_err(|_| RuntimeError::ChannelClosed)?;
607        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
608    }
609
610    pub async fn submit_user_input(
611        &self,
612        session_id: SessionId,
613        content: Vec<UserContent>,
614        model: ModelId,
615    ) -> Result<OpId, RuntimeError> {
616        let has_text = content
617            .iter()
618            .any(|item| matches!(item, UserContent::Text { text } if !text.trim().is_empty()));
619        let has_non_text = content
620            .iter()
621            .any(|item| !matches!(item, UserContent::Text { .. }));
622        if !has_text && !has_non_text {
623            return Err(RuntimeError::InvalidInput {
624                message: "Input text cannot be empty".to_string(),
625            });
626        }
627
628        let op_id = OpId::new();
629        let message_id = MessageId::new();
630        let timestamp = current_timestamp();
631
632        let action = Action::UserInput {
633            session_id,
634            content,
635            op_id,
636            message_id,
637            model,
638            timestamp,
639        };
640
641        self.dispatch_action(session_id, action).await?;
642
643        Ok(op_id)
644    }
645
646    pub async fn submit_tool_approval(
647        &self,
648        session_id: SessionId,
649        request_id: RequestId,
650        approved: bool,
651        remember: Option<crate::app::domain::action::ApprovalMemory>,
652    ) -> Result<(), RuntimeError> {
653        use crate::app::domain::action::ApprovalDecision;
654
655        let decision = if approved {
656            ApprovalDecision::Approved
657        } else {
658            ApprovalDecision::Denied
659        };
660
661        let action = Action::ToolApprovalDecided {
662            session_id,
663            request_id,
664            decision,
665            remember,
666        };
667
668        self.dispatch_action(session_id, action).await
669    }
670
671    pub async fn switch_primary_agent(
672        &self,
673        session_id: SessionId,
674        agent_id: String,
675    ) -> Result<(), RuntimeError> {
676        let action = Action::SwitchPrimaryAgent {
677            session_id,
678            agent_id,
679        };
680        self.dispatch_action(session_id, action).await
681    }
682
683    pub async fn cancel_operation(
684        &self,
685        session_id: SessionId,
686        op_id: Option<OpId>,
687    ) -> Result<(), RuntimeError> {
688        let action = Action::Cancel { session_id, op_id };
689        self.dispatch_action(session_id, action).await
690    }
691
692    pub async fn submit_edited_message(
693        &self,
694        session_id: SessionId,
695        original_message_id: String,
696        new_content: Vec<UserContent>,
697        model: ModelId,
698    ) -> Result<OpId, RuntimeError> {
699        let op_id = OpId::new();
700        let new_message_id = MessageId::new();
701        let timestamp = current_timestamp();
702
703        let action = Action::UserEditedMessage {
704            session_id,
705            message_id: MessageId::from_string(original_message_id),
706            new_content,
707            op_id,
708            new_message_id,
709            model,
710            timestamp,
711        };
712
713        self.dispatch_action(session_id, action).await?;
714        Ok(op_id)
715    }
716
717    pub async fn submit_dequeue_queued_item(
718        &self,
719        session_id: SessionId,
720    ) -> Result<(), RuntimeError> {
721        let state = self.get_session_state(session_id).await?;
722        if state.queued_work.is_empty() {
723            return Err(RuntimeError::InvalidInput {
724                message: "No queued item to remove".to_string(),
725            });
726        }
727
728        let action = Action::DequeueQueuedItem { session_id };
729        self.dispatch_action(session_id, action).await
730    }
731
732    pub async fn compact_session(
733        &self,
734        session_id: SessionId,
735        model: ModelId,
736    ) -> Result<OpId, RuntimeError> {
737        let op_id = OpId::new();
738
739        let action = Action::RequestCompaction {
740            session_id,
741            op_id,
742            model,
743        };
744
745        self.dispatch_action(session_id, action).await?;
746        Ok(op_id)
747    }
748
749    pub async fn execute_bash_command(
750        &self,
751        session_id: SessionId,
752        command: String,
753    ) -> Result<OpId, RuntimeError> {
754        let op_id = OpId::new();
755        let message_id = MessageId::new();
756        let timestamp = current_timestamp();
757
758        let action = Action::DirectBashCommand {
759            session_id,
760            op_id,
761            message_id,
762            command,
763            timestamp,
764        };
765
766        self.dispatch_action(session_id, action).await?;
767        Ok(op_id)
768    }
769
770    pub async fn list_all_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
771        let (reply_tx, reply_rx) = oneshot::channel();
772        self.tx
773            .send(SupervisorCmd::ListAllSessions { reply: reply_tx })
774            .await
775            .map_err(|_| RuntimeError::ChannelClosed)?;
776        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
777    }
778
779    pub async fn session_exists(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
780        let (reply_tx, reply_rx) = oneshot::channel();
781        self.tx
782            .send(SupervisorCmd::SessionExists {
783                session_id,
784                reply: reply_tx,
785            })
786            .await
787            .map_err(|_| RuntimeError::ChannelClosed)?;
788        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
789    }
790
791    pub fn shutdown(&self) {
792        let _ = self.tx.try_send(SupervisorCmd::Shutdown);
793    }
794}
795
796pub struct RuntimeService {
797    pub handle: RuntimeHandle,
798    task: JoinHandle<()>,
799}
800
801impl RuntimeService {
802    pub fn spawn(
803        event_store: Arc<dyn EventStore>,
804        api_client: Arc<ApiClient>,
805        tool_executor: Arc<ToolExecutor>,
806    ) -> Self {
807        let (tx, rx) = mpsc::channel(64);
808
809        let supervisor = RuntimeSupervisor::new(event_store, api_client, tool_executor);
810        let task = tokio::spawn(supervisor.run(rx));
811
812        let handle = RuntimeHandle { tx };
813
814        Self { handle, task }
815    }
816
817    pub fn handle(&self) -> RuntimeHandle {
818        self.handle.clone()
819    }
820
821    pub async fn shutdown(self) {
822        self.handle.shutdown();
823        let _ = self.task.await;
824    }
825}
826
827fn current_timestamp() -> u64 {
828    std::time::SystemTime::now()
829        .duration_since(std::time::UNIX_EPOCH)
830        .unwrap_or_default()
831        .as_secs()
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837    use crate::app::domain::session::event_store::InMemoryEventStore;
838    use crate::app::validation::ValidatorRegistry;
839    use crate::tools::BackendRegistry;
840
841    async fn create_test_deps() -> (Arc<dyn EventStore>, Arc<ApiClient>, Arc<ToolExecutor>) {
842        let event_store = Arc::new(InMemoryEventStore::new());
843        let model_registry = Arc::new(crate::model_registry::ModelRegistry::load(&[]).unwrap());
844        let provider_registry = Arc::new(crate::auth::ProviderRegistry::load(&[]).unwrap());
845        let api_client = Arc::new(ApiClient::new_with_deps(
846            crate::test_utils::test_llm_config_provider().unwrap(),
847            provider_registry,
848            model_registry,
849        ));
850
851        let tool_executor = Arc::new(ToolExecutor::with_components(
852            Arc::new(BackendRegistry::new()),
853            Arc::new(ValidatorRegistry::new()),
854        ));
855
856        (event_store, api_client, tool_executor)
857    }
858
859    fn test_session_config() -> SessionConfig {
860        SessionConfig {
861            default_model: crate::config::model::builtin::claude_sonnet_4_5(),
862            workspace: crate::session::state::WorkspaceConfig::Local {
863                path: std::env::current_dir().unwrap(),
864            },
865            workspace_ref: None,
866            workspace_id: None,
867            repo_ref: None,
868            parent_session_id: None,
869            workspace_name: None,
870            tool_config: crate::session::state::SessionToolConfig::default(),
871            system_prompt: None,
872            primary_agent_id: None,
873            policy_overrides: crate::session::state::SessionPolicyOverrides::empty(),
874            metadata: std::collections::HashMap::new(),
875        }
876    }
877
878    #[tokio::test]
879    async fn test_create_session() {
880        let (event_store, api_client, tool_executor) = create_test_deps().await;
881        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
882
883        let session_id = service
884            .handle
885            .create_session(test_session_config())
886            .await
887            .unwrap();
888
889        assert!(service.handle.is_session_active(session_id).await.unwrap());
890
891        service.shutdown().await;
892    }
893
894    #[tokio::test]
895    async fn test_suspend_and_resume_session() {
896        let (event_store, api_client, tool_executor) = create_test_deps().await;
897        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
898
899        let session_id = service
900            .handle
901            .create_session(test_session_config())
902            .await
903            .unwrap();
904
905        service.handle.suspend_session(session_id).await.unwrap();
906        assert!(!service.handle.is_session_active(session_id).await.unwrap());
907
908        service.handle.resume_session(session_id).await.unwrap();
909        assert!(service.handle.is_session_active(session_id).await.unwrap());
910
911        service.shutdown().await;
912    }
913
914    #[tokio::test]
915    async fn test_delete_session() {
916        let (event_store, api_client, tool_executor) = create_test_deps().await;
917        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
918
919        let session_id = service
920            .handle
921            .create_session(test_session_config())
922            .await
923            .unwrap();
924
925        service.handle.delete_session(session_id).await.unwrap();
926        assert!(!service.handle.is_session_active(session_id).await.unwrap());
927
928        let result = service.handle.resume_session(session_id).await;
929        assert!(matches!(result, Err(RuntimeError::SessionNotFound { .. })));
930
931        service.shutdown().await;
932    }
933
934    #[tokio::test]
935    async fn test_subscribe_events() {
936        let (event_store, api_client, tool_executor) = create_test_deps().await;
937        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
938
939        let session_id = service
940            .handle
941            .create_session(test_session_config())
942            .await
943            .unwrap();
944
945        let _subscription = service.handle.subscribe_events(session_id).await.unwrap();
946
947        service.shutdown().await;
948    }
949}