Skip to main content

steer_core/app/domain/runtime/
supervisor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use thiserror::Error;
5use tokio::sync::{broadcast, mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::api::Client as ApiClient;
9use crate::app::domain::action::Action;
10use crate::app::domain::delta::StreamDelta;
11use crate::app::domain::event::SessionEvent;
12use crate::app::domain::reduce::apply_event_to_state;
13use crate::app::domain::session::EventStore;
14use crate::app::domain::state::AppState;
15use crate::app::domain::types::{MessageId, NonEmptyString, OpId, RequestId, SessionId};
16use crate::config::model::ModelId;
17use crate::primary_agents::{default_primary_agent_id, resolve_effective_config};
18use crate::prompts::system_prompt_for_model;
19use crate::session::state::SessionConfig;
20use crate::tools::ToolExecutor;
21use tracing::warn;
22
23use super::session_actor::{SessionActorHandle, SessionError, spawn_session_actor};
24use super::subscription::SessionEventSubscription;
25
26#[derive(Debug, Error)]
27pub enum RuntimeError {
28    #[error("Session not found: {session_id}")]
29    SessionNotFound { session_id: String },
30
31    #[error("Session already exists: {session_id}")]
32    SessionAlreadyExists { session_id: String },
33
34    #[error("Session error: {0}")]
35    Session(SessionError),
36
37    #[error("Event store error: {0}")]
38    EventStore(#[from] crate::app::domain::session::EventStoreError),
39
40    #[error("Channel closed")]
41    ChannelClosed,
42
43    #[error("Invalid input: {message}")]
44    InvalidInput { message: String },
45
46    #[error("Supervisor shutting down")]
47    ShuttingDown,
48}
49
50impl From<SessionError> for RuntimeError {
51    fn from(error: SessionError) -> Self {
52        match error {
53            SessionError::InvalidInput { message, .. } => RuntimeError::InvalidInput { message },
54            other => RuntimeError::Session(other),
55        }
56    }
57}
58
59pub(crate) enum SupervisorCmd {
60    CreateSession {
61        config: Box<SessionConfig>,
62        reply: oneshot::Sender<Result<SessionId, RuntimeError>>,
63    },
64    ResumeSession {
65        session_id: SessionId,
66        reply: oneshot::Sender<Result<(), RuntimeError>>,
67    },
68    SuspendSession {
69        session_id: SessionId,
70        reply: oneshot::Sender<Result<(), RuntimeError>>,
71    },
72    DeleteSession {
73        session_id: SessionId,
74        reply: oneshot::Sender<Result<(), RuntimeError>>,
75    },
76    DispatchAction {
77        session_id: SessionId,
78        action: Box<Action>,
79        reply: oneshot::Sender<Result<(), RuntimeError>>,
80    },
81    SubscribeEvents {
82        session_id: SessionId,
83        reply: oneshot::Sender<Result<SessionEventSubscription, RuntimeError>>,
84    },
85    SubscribeDeltas {
86        session_id: SessionId,
87        reply: oneshot::Sender<Result<broadcast::Receiver<StreamDelta>, RuntimeError>>,
88    },
89    LoadEventsAfter {
90        session_id: SessionId,
91        after_seq: u64,
92        reply: oneshot::Sender<Result<Vec<(u64, SessionEvent)>, RuntimeError>>,
93    },
94    GetSessionState {
95        session_id: SessionId,
96        reply: oneshot::Sender<Result<AppState, RuntimeError>>,
97    },
98    IsSessionActive {
99        session_id: SessionId,
100        reply: oneshot::Sender<bool>,
101    },
102    ListActiveSessions {
103        reply: oneshot::Sender<Vec<SessionId>>,
104    },
105    ListAllSessions {
106        reply: oneshot::Sender<Result<Vec<SessionId>, RuntimeError>>,
107    },
108    SessionExists {
109        session_id: SessionId,
110        reply: oneshot::Sender<Result<bool, RuntimeError>>,
111    },
112    Shutdown,
113}
114
115struct RuntimeSupervisor {
116    sessions: HashMap<SessionId, SessionActorHandle>,
117    event_store: Arc<dyn EventStore>,
118    api_client: Arc<ApiClient>,
119    tool_executor: Arc<ToolExecutor>,
120}
121
122impl RuntimeSupervisor {
123    fn new(
124        event_store: Arc<dyn EventStore>,
125        api_client: Arc<ApiClient>,
126        tool_executor: Arc<ToolExecutor>,
127    ) -> Self {
128        Self {
129            sessions: HashMap::new(),
130            event_store,
131            api_client,
132            tool_executor,
133        }
134    }
135
136    async fn run(mut self, mut cmd_rx: mpsc::Receiver<SupervisorCmd>) {
137        loop {
138            tokio::select! {
139                Some(cmd) = cmd_rx.recv() => {
140                    match cmd {
141                        SupervisorCmd::CreateSession { config, reply } => {
142                            let result = self.create_session(*config).await;
143                            let _ = reply.send(result);
144                        }
145                        SupervisorCmd::ResumeSession { session_id, reply } => {
146                            let result = self.resume_session(session_id).await;
147                            let _ = reply.send(result);
148                        }
149                        SupervisorCmd::SuspendSession { session_id, reply } => {
150                            let result = self.suspend_session(session_id).await;
151                            let _ = reply.send(result);
152                        }
153                        SupervisorCmd::DeleteSession { session_id, reply } => {
154                            let result = self.delete_session(session_id).await;
155                            let _ = reply.send(result);
156                        }
157                        SupervisorCmd::DispatchAction { session_id, action, reply } => {
158                            let result = self.dispatch_action(session_id, *action).await;
159                            let _ = reply.send(result);
160                        }
161                        SupervisorCmd::SubscribeEvents { session_id, reply } => {
162                            let result = self.subscribe_events(session_id).await;
163                            let _ = reply.send(result);
164                        }
165                        SupervisorCmd::SubscribeDeltas { session_id, reply } => {
166                            let result = self.subscribe_deltas(session_id).await;
167                            let _ = reply.send(result);
168                        }
169                        SupervisorCmd::LoadEventsAfter {
170                            session_id,
171                            after_seq,
172                            reply,
173                        } => {
174                            let result = self
175                                .event_store
176                                .load_events_after(session_id, after_seq)
177                                .await
178                                .map_err(RuntimeError::from);
179                            let _ = reply.send(result);
180                        }
181                        SupervisorCmd::GetSessionState { session_id, reply } => {
182                            let result = self.get_session_state(session_id).await;
183                            let _ = reply.send(result);
184                        }
185                        SupervisorCmd::IsSessionActive { session_id, reply } => {
186                            let is_active = self.sessions.contains_key(&session_id);
187                            let _ = reply.send(is_active);
188                        }
189                        SupervisorCmd::ListActiveSessions { reply } => {
190                            let sessions: Vec<SessionId> = self.sessions.keys().copied().collect();
191                            let _ = reply.send(sessions);
192                        }
193                        SupervisorCmd::ListAllSessions { reply } => {
194                            let result = self.event_store.list_session_ids().await
195                                .map_err(RuntimeError::from);
196                            let _ = reply.send(result);
197                        }
198                        SupervisorCmd::SessionExists { session_id, reply } => {
199                            let result = self.event_store.session_exists(session_id).await
200                                .map_err(RuntimeError::from);
201                            let _ = reply.send(result);
202                        }
203                        SupervisorCmd::Shutdown => {
204                            self.shutdown_all().await;
205                            break;
206                        }
207                    }
208                }
209                else => break,
210            }
211        }
212
213        tracing::info!("Runtime supervisor stopped");
214    }
215
216    async fn create_session(&mut self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
217        let session_id = SessionId::new();
218
219        self.event_store.create_session(session_id).await?;
220
221        let mut config = config;
222
223        if config.primary_agent_id.is_none() {
224            config.primary_agent_id = Some(default_primary_agent_id().to_string());
225        }
226        let mut config = resolve_effective_config(&config);
227
228        let system_context = self.resolve_system_context(&config).await;
229        if let Some(context) = &system_context {
230            config.system_prompt = Some(context.prompt.clone());
231        }
232
233        let session_created_event = SessionEvent::SessionCreated {
234            config: Box::new(config.clone()),
235            metadata: config.metadata.clone(),
236            parent_session_id: None,
237        };
238        self.event_store
239            .append(session_id, &session_created_event)
240            .await?;
241
242        let mut state = AppState::new(session_id);
243        state.apply_session_config(&config, config.primary_agent_id.clone(), true);
244        if let Some(system_context) = system_context {
245            state.cached_system_context = Some(system_context);
246        }
247
248        let handle = spawn_session_actor(
249            session_id,
250            state,
251            self.event_store.clone(),
252            self.api_client.clone(),
253            self.tool_executor.clone(),
254        );
255        self.sessions.insert(session_id, handle);
256
257        tracing::info!(session_id = %session_id, "Created session");
258
259        Ok(session_id)
260    }
261
262    async fn resume_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
263        if self.sessions.contains_key(&session_id) {
264            return Ok(());
265        }
266
267        if !self.event_store.session_exists(session_id).await? {
268            return Err(RuntimeError::SessionNotFound {
269                session_id: session_id.to_string(),
270            });
271        }
272
273        let events = self.event_store.load_events(session_id).await?;
274
275        let mut state = AppState::new(session_id);
276        for (_, event) in &events {
277            apply_event_to_state(&mut state, event);
278        }
279
280        if let Some(config) = state.session_config.clone() {
281            let mut resolved = resolve_effective_config(&config);
282            let system_context = self.resolve_system_context(&resolved).await;
283            if let Some(context) = &system_context {
284                resolved.system_prompt = Some(context.prompt.clone());
285            }
286            state.apply_session_config(&resolved, resolved.primary_agent_id.clone(), false);
287            state.cached_system_context = system_context;
288        }
289
290        let should_drain_queue = !state.has_active_operation() && !state.queued_work.is_empty();
291        let handle = spawn_session_actor(
292            session_id,
293            state,
294            self.event_store.clone(),
295            self.api_client.clone(),
296            self.tool_executor.clone(),
297        );
298
299        if should_drain_queue {
300            handle
301                .dispatch(Action::DrainQueuedWork { session_id })
302                .await?;
303        }
304
305        self.sessions.insert(session_id, handle);
306
307        tracing::info!(
308            session_id = %session_id,
309            event_count = events.len(),
310            "Resumed session"
311        );
312
313        Ok(())
314    }
315
316    async fn resolve_system_context(
317        &self,
318        config: &SessionConfig,
319    ) -> Option<crate::app::SystemContext> {
320        let prompt = config
321            .system_prompt
322            .as_ref()
323            .and_then(|prompt| {
324                if prompt.trim().is_empty() {
325                    None
326                } else {
327                    Some(prompt.clone())
328                }
329            })
330            .unwrap_or_else(|| system_prompt_for_model(&config.default_model));
331
332        let workspace = match self.tool_executor.workspace() {
333            Some(workspace) => workspace,
334            None => return Some(crate::app::SystemContext::new(prompt)),
335        };
336
337        let environment = match workspace.environment().await {
338            Ok(env_info) => Some(env_info),
339            Err(error) => {
340                warn!(error = %error, "Failed to collect environment info for system context");
341                None
342            }
343        };
344
345        Some(crate::app::SystemContext::with_environment(
346            prompt,
347            environment,
348        ))
349    }
350
351    async fn suspend_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
352        if let Some(handle) = self.sessions.remove(&session_id) {
353            let _ = handle.suspend().await;
354            tracing::info!(session_id = %session_id, "Suspended session");
355        }
356        Ok(())
357    }
358
359    async fn delete_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
360        if let Some(handle) = self.sessions.remove(&session_id) {
361            handle.shutdown();
362        }
363
364        self.event_store.delete_session(session_id).await?;
365
366        tracing::info!(session_id = %session_id, "Deleted session");
367
368        Ok(())
369    }
370
371    async fn dispatch_action(
372        &mut self,
373        session_id: SessionId,
374        action: Action,
375    ) -> Result<(), RuntimeError> {
376        if !self.sessions.contains_key(&session_id) {
377            self.resume_session(session_id).await?;
378        }
379
380        let handle =
381            self.sessions
382                .get(&session_id)
383                .ok_or_else(|| RuntimeError::SessionNotFound {
384                    session_id: session_id.to_string(),
385                })?;
386
387        handle.dispatch(action).await?;
388
389        Ok(())
390    }
391
392    async fn subscribe_events(
393        &mut self,
394        session_id: SessionId,
395    ) -> Result<SessionEventSubscription, RuntimeError> {
396        if !self.sessions.contains_key(&session_id) {
397            self.resume_session(session_id).await?;
398        }
399
400        let handle =
401            self.sessions
402                .get(&session_id)
403                .ok_or_else(|| RuntimeError::SessionNotFound {
404                    session_id: session_id.to_string(),
405                })?;
406
407        let subscription = handle.subscribe().await?;
408
409        Ok(subscription)
410    }
411
412    async fn subscribe_deltas(
413        &mut self,
414        session_id: SessionId,
415    ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
416        if !self.sessions.contains_key(&session_id) {
417            self.resume_session(session_id).await?;
418        }
419
420        let handle =
421            self.sessions
422                .get(&session_id)
423                .ok_or_else(|| RuntimeError::SessionNotFound {
424                    session_id: session_id.to_string(),
425                })?;
426
427        let delta_rx = handle.subscribe_deltas().await?;
428
429        Ok(delta_rx)
430    }
431
432    async fn get_session_state(&mut self, session_id: SessionId) -> Result<AppState, RuntimeError> {
433        if !self.sessions.contains_key(&session_id) {
434            self.resume_session(session_id).await?;
435        }
436
437        let handle =
438            self.sessions
439                .get(&session_id)
440                .ok_or_else(|| RuntimeError::SessionNotFound {
441                    session_id: session_id.to_string(),
442                })?;
443
444        let state = handle.get_state().await?;
445
446        Ok(state)
447    }
448
449    async fn shutdown_all(&mut self) {
450        for (session_id, handle) in self.sessions.drain() {
451            handle.shutdown();
452            tracing::debug!(session_id = %session_id, "Shutting down session");
453        }
454    }
455}
456
457#[derive(Clone)]
458pub struct RuntimeHandle {
459    tx: mpsc::Sender<SupervisorCmd>,
460}
461
462impl RuntimeHandle {
463    pub async fn create_session(&self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
464        let (reply_tx, reply_rx) = oneshot::channel();
465        self.tx
466            .send(SupervisorCmd::CreateSession {
467                config: Box::new(config),
468                reply: reply_tx,
469            })
470            .await
471            .map_err(|_| RuntimeError::ChannelClosed)?;
472        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
473    }
474
475    pub async fn resume_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
476        let (reply_tx, reply_rx) = oneshot::channel();
477        self.tx
478            .send(SupervisorCmd::ResumeSession {
479                session_id,
480                reply: reply_tx,
481            })
482            .await
483            .map_err(|_| RuntimeError::ChannelClosed)?;
484        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
485    }
486
487    pub async fn suspend_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
488        let (reply_tx, reply_rx) = oneshot::channel();
489        self.tx
490            .send(SupervisorCmd::SuspendSession {
491                session_id,
492                reply: reply_tx,
493            })
494            .await
495            .map_err(|_| RuntimeError::ChannelClosed)?;
496        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
497    }
498
499    pub async fn delete_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
500        let (reply_tx, reply_rx) = oneshot::channel();
501        self.tx
502            .send(SupervisorCmd::DeleteSession {
503                session_id,
504                reply: reply_tx,
505            })
506            .await
507            .map_err(|_| RuntimeError::ChannelClosed)?;
508        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
509    }
510
511    pub async fn dispatch_action(
512        &self,
513        session_id: SessionId,
514        action: Action,
515    ) -> Result<(), RuntimeError> {
516        let (reply_tx, reply_rx) = oneshot::channel();
517        self.tx
518            .send(SupervisorCmd::DispatchAction {
519                session_id,
520                action: Box::new(action),
521                reply: reply_tx,
522            })
523            .await
524            .map_err(|_| RuntimeError::ChannelClosed)?;
525        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
526    }
527
528    pub async fn subscribe_events(
529        &self,
530        session_id: SessionId,
531    ) -> Result<SessionEventSubscription, RuntimeError> {
532        let (reply_tx, reply_rx) = oneshot::channel();
533        self.tx
534            .send(SupervisorCmd::SubscribeEvents {
535                session_id,
536                reply: reply_tx,
537            })
538            .await
539            .map_err(|_| RuntimeError::ChannelClosed)?;
540        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
541    }
542
543    pub async fn subscribe_deltas(
544        &self,
545        session_id: SessionId,
546    ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
547        let (reply_tx, reply_rx) = oneshot::channel();
548        self.tx
549            .send(SupervisorCmd::SubscribeDeltas {
550                session_id,
551                reply: reply_tx,
552            })
553            .await
554            .map_err(|_| RuntimeError::ChannelClosed)?;
555        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
556    }
557
558    pub async fn load_events_after(
559        &self,
560        session_id: SessionId,
561        after_seq: u64,
562    ) -> Result<Vec<(u64, SessionEvent)>, RuntimeError> {
563        let (reply_tx, reply_rx) = oneshot::channel();
564        self.tx
565            .send(SupervisorCmd::LoadEventsAfter {
566                session_id,
567                after_seq,
568                reply: reply_tx,
569            })
570            .await
571            .map_err(|_| RuntimeError::ChannelClosed)?;
572        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
573    }
574
575    pub async fn get_session_state(&self, session_id: SessionId) -> Result<AppState, RuntimeError> {
576        let (reply_tx, reply_rx) = oneshot::channel();
577        self.tx
578            .send(SupervisorCmd::GetSessionState {
579                session_id,
580                reply: reply_tx,
581            })
582            .await
583            .map_err(|_| RuntimeError::ChannelClosed)?;
584        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
585    }
586
587    pub async fn is_session_active(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
588        let (reply_tx, reply_rx) = oneshot::channel();
589        self.tx
590            .send(SupervisorCmd::IsSessionActive {
591                session_id,
592                reply: reply_tx,
593            })
594            .await
595            .map_err(|_| RuntimeError::ChannelClosed)?;
596        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
597    }
598
599    pub async fn list_active_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
600        let (reply_tx, reply_rx) = oneshot::channel();
601        self.tx
602            .send(SupervisorCmd::ListActiveSessions { reply: reply_tx })
603            .await
604            .map_err(|_| RuntimeError::ChannelClosed)?;
605        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
606    }
607
608    pub async fn submit_user_input(
609        &self,
610        session_id: SessionId,
611        text: String,
612        model: ModelId,
613    ) -> Result<OpId, RuntimeError> {
614        let text = NonEmptyString::new(text).ok_or_else(|| RuntimeError::InvalidInput {
615            message: "Input text cannot be empty".to_string(),
616        })?;
617
618        let op_id = OpId::new();
619        let message_id = MessageId::new();
620        let timestamp = current_timestamp();
621
622        let action = Action::UserInput {
623            session_id,
624            text,
625            op_id,
626            message_id,
627            model,
628            timestamp,
629        };
630
631        self.dispatch_action(session_id, action).await?;
632
633        Ok(op_id)
634    }
635
636    pub async fn submit_tool_approval(
637        &self,
638        session_id: SessionId,
639        request_id: RequestId,
640        approved: bool,
641        remember: Option<crate::app::domain::action::ApprovalMemory>,
642    ) -> Result<(), RuntimeError> {
643        use crate::app::domain::action::ApprovalDecision;
644
645        let decision = if approved {
646            ApprovalDecision::Approved
647        } else {
648            ApprovalDecision::Denied
649        };
650
651        let action = Action::ToolApprovalDecided {
652            session_id,
653            request_id,
654            decision,
655            remember,
656        };
657
658        self.dispatch_action(session_id, action).await
659    }
660
661    pub async fn switch_primary_agent(
662        &self,
663        session_id: SessionId,
664        agent_id: String,
665    ) -> Result<(), RuntimeError> {
666        let action = Action::SwitchPrimaryAgent {
667            session_id,
668            agent_id,
669        };
670        self.dispatch_action(session_id, action).await
671    }
672
673    pub async fn cancel_operation(
674        &self,
675        session_id: SessionId,
676        op_id: Option<OpId>,
677    ) -> Result<(), RuntimeError> {
678        let action = Action::Cancel { session_id, op_id };
679        self.dispatch_action(session_id, action).await
680    }
681
682    pub async fn submit_edited_message(
683        &self,
684        session_id: SessionId,
685        original_message_id: String,
686        new_content: String,
687        model: ModelId,
688    ) -> Result<OpId, RuntimeError> {
689        let op_id = OpId::new();
690        let new_message_id = MessageId::new();
691        let timestamp = current_timestamp();
692
693        let action = Action::UserEditedMessage {
694            session_id,
695            message_id: MessageId::from_string(original_message_id),
696            new_content,
697            op_id,
698            new_message_id,
699            model,
700            timestamp,
701        };
702
703        self.dispatch_action(session_id, action).await?;
704        Ok(op_id)
705    }
706
707    pub async fn submit_dequeue_queued_item(
708        &self,
709        session_id: SessionId,
710    ) -> Result<(), RuntimeError> {
711        let state = self.get_session_state(session_id).await?;
712        if state.queued_work.is_empty() {
713            return Err(RuntimeError::InvalidInput {
714                message: "No queued item to remove".to_string(),
715            });
716        }
717
718        let action = Action::DequeueQueuedItem { session_id };
719        self.dispatch_action(session_id, action).await
720    }
721
722    pub async fn compact_session(
723        &self,
724        session_id: SessionId,
725        model: ModelId,
726    ) -> Result<OpId, RuntimeError> {
727        let op_id = OpId::new();
728
729        let action = Action::RequestCompaction {
730            session_id,
731            op_id,
732            model,
733        };
734
735        self.dispatch_action(session_id, action).await?;
736        Ok(op_id)
737    }
738
739    pub async fn execute_bash_command(
740        &self,
741        session_id: SessionId,
742        command: String,
743    ) -> Result<OpId, RuntimeError> {
744        let op_id = OpId::new();
745        let message_id = MessageId::new();
746        let timestamp = current_timestamp();
747
748        let action = Action::DirectBashCommand {
749            session_id,
750            op_id,
751            message_id,
752            command,
753            timestamp,
754        };
755
756        self.dispatch_action(session_id, action).await?;
757        Ok(op_id)
758    }
759
760    pub async fn list_all_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
761        let (reply_tx, reply_rx) = oneshot::channel();
762        self.tx
763            .send(SupervisorCmd::ListAllSessions { reply: reply_tx })
764            .await
765            .map_err(|_| RuntimeError::ChannelClosed)?;
766        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
767    }
768
769    pub async fn session_exists(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
770        let (reply_tx, reply_rx) = oneshot::channel();
771        self.tx
772            .send(SupervisorCmd::SessionExists {
773                session_id,
774                reply: reply_tx,
775            })
776            .await
777            .map_err(|_| RuntimeError::ChannelClosed)?;
778        reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
779    }
780
781    pub fn shutdown(&self) {
782        let _ = self.tx.try_send(SupervisorCmd::Shutdown);
783    }
784}
785
786pub struct RuntimeService {
787    pub handle: RuntimeHandle,
788    task: JoinHandle<()>,
789}
790
791impl RuntimeService {
792    pub fn spawn(
793        event_store: Arc<dyn EventStore>,
794        api_client: Arc<ApiClient>,
795        tool_executor: Arc<ToolExecutor>,
796    ) -> Self {
797        let (tx, rx) = mpsc::channel(64);
798
799        let supervisor = RuntimeSupervisor::new(event_store, api_client, tool_executor);
800        let task = tokio::spawn(supervisor.run(rx));
801
802        let handle = RuntimeHandle { tx };
803
804        Self { handle, task }
805    }
806
807    pub fn handle(&self) -> RuntimeHandle {
808        self.handle.clone()
809    }
810
811    pub async fn shutdown(self) {
812        self.handle.shutdown();
813        let _ = self.task.await;
814    }
815}
816
817fn current_timestamp() -> u64 {
818    std::time::SystemTime::now()
819        .duration_since(std::time::UNIX_EPOCH)
820        .unwrap_or_default()
821        .as_secs()
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::app::domain::session::event_store::InMemoryEventStore;
828    use crate::app::validation::ValidatorRegistry;
829    use crate::tools::BackendRegistry;
830
831    async fn create_test_deps() -> (Arc<dyn EventStore>, Arc<ApiClient>, Arc<ToolExecutor>) {
832        let event_store = Arc::new(InMemoryEventStore::new());
833        let model_registry = Arc::new(crate::model_registry::ModelRegistry::load(&[]).unwrap());
834        let provider_registry = Arc::new(crate::auth::ProviderRegistry::load(&[]).unwrap());
835        let api_client = Arc::new(ApiClient::new_with_deps(
836            crate::test_utils::test_llm_config_provider().unwrap(),
837            provider_registry,
838            model_registry,
839        ));
840
841        let tool_executor = Arc::new(ToolExecutor::with_components(
842            Arc::new(BackendRegistry::new()),
843            Arc::new(ValidatorRegistry::new()),
844        ));
845
846        (event_store, api_client, tool_executor)
847    }
848
849    fn test_session_config() -> SessionConfig {
850        SessionConfig {
851            default_model: crate::config::model::builtin::claude_sonnet_4_5(),
852            workspace: crate::session::state::WorkspaceConfig::Local {
853                path: std::env::current_dir().unwrap(),
854            },
855            workspace_ref: None,
856            workspace_id: None,
857            repo_ref: None,
858            parent_session_id: None,
859            workspace_name: None,
860            tool_config: crate::session::state::SessionToolConfig::default(),
861            system_prompt: None,
862            primary_agent_id: None,
863            policy_overrides: crate::session::state::SessionPolicyOverrides::empty(),
864            metadata: std::collections::HashMap::new(),
865        }
866    }
867
868    #[tokio::test]
869    async fn test_create_session() {
870        let (event_store, api_client, tool_executor) = create_test_deps().await;
871        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
872
873        let session_id = service
874            .handle
875            .create_session(test_session_config())
876            .await
877            .unwrap();
878
879        assert!(service.handle.is_session_active(session_id).await.unwrap());
880
881        service.shutdown().await;
882    }
883
884    #[tokio::test]
885    async fn test_suspend_and_resume_session() {
886        let (event_store, api_client, tool_executor) = create_test_deps().await;
887        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
888
889        let session_id = service
890            .handle
891            .create_session(test_session_config())
892            .await
893            .unwrap();
894
895        service.handle.suspend_session(session_id).await.unwrap();
896        assert!(!service.handle.is_session_active(session_id).await.unwrap());
897
898        service.handle.resume_session(session_id).await.unwrap();
899        assert!(service.handle.is_session_active(session_id).await.unwrap());
900
901        service.shutdown().await;
902    }
903
904    #[tokio::test]
905    async fn test_delete_session() {
906        let (event_store, api_client, tool_executor) = create_test_deps().await;
907        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
908
909        let session_id = service
910            .handle
911            .create_session(test_session_config())
912            .await
913            .unwrap();
914
915        service.handle.delete_session(session_id).await.unwrap();
916        assert!(!service.handle.is_session_active(session_id).await.unwrap());
917
918        let result = service.handle.resume_session(session_id).await;
919        assert!(matches!(result, Err(RuntimeError::SessionNotFound { .. })));
920
921        service.shutdown().await;
922    }
923
924    #[tokio::test]
925    async fn test_subscribe_events() {
926        let (event_store, api_client, tool_executor) = create_test_deps().await;
927        let service = RuntimeService::spawn(event_store, api_client, tool_executor);
928
929        let session_id = service
930            .handle
931            .create_session(test_session_config())
932            .await
933            .unwrap();
934
935        let _subscription = service.handle.subscribe_events(session_id).await.unwrap();
936
937        service.shutdown().await;
938    }
939}