1use std::collections::HashMap;
2use std::sync::Arc;
3
4use thiserror::Error;
5use tokio::sync::{broadcast, mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::api::Client as ApiClient;
9use crate::app::conversation::UserContent;
10use crate::app::domain::action::Action;
11use crate::app::domain::delta::StreamDelta;
12use crate::app::domain::event::SessionEvent;
13use crate::app::domain::reduce::apply_event_to_state;
14use crate::app::domain::session::EventStore;
15use crate::app::domain::state::AppState;
16use crate::app::domain::types::{MessageId, OpId, RequestId, SessionId};
17
18use crate::config::model::ModelId;
19use crate::primary_agents::{default_primary_agent_id, resolve_effective_config};
20use crate::prompts::system_prompt_for_model;
21use crate::session::state::SessionConfig;
22use crate::tools::ToolExecutor;
23use tracing::warn;
24
25use super::session_actor::{SessionActorHandle, SessionError, spawn_session_actor};
26use super::subscription::SessionEventSubscription;
27
28#[derive(Debug, Error)]
29pub enum RuntimeError {
30 #[error("Session not found: {session_id}")]
31 SessionNotFound { session_id: String },
32
33 #[error("Session already exists: {session_id}")]
34 SessionAlreadyExists { session_id: String },
35
36 #[error("Session error: {0}")]
37 Session(SessionError),
38
39 #[error("Event store error: {0}")]
40 EventStore(#[from] crate::app::domain::session::EventStoreError),
41
42 #[error("Channel closed")]
43 ChannelClosed,
44
45 #[error("Invalid input: {message}")]
46 InvalidInput { message: String },
47
48 #[error("Supervisor shutting down")]
49 ShuttingDown,
50}
51
52impl From<SessionError> for RuntimeError {
53 fn from(error: SessionError) -> Self {
54 match error {
55 SessionError::InvalidInput { message, .. } => RuntimeError::InvalidInput { message },
56 other => RuntimeError::Session(other),
57 }
58 }
59}
60
61pub(crate) enum SupervisorCmd {
62 CreateSession {
63 config: Box<SessionConfig>,
64 reply: oneshot::Sender<Result<SessionId, RuntimeError>>,
65 },
66 ResumeSession {
67 session_id: SessionId,
68 reply: oneshot::Sender<Result<(), RuntimeError>>,
69 },
70 SuspendSession {
71 session_id: SessionId,
72 reply: oneshot::Sender<Result<(), RuntimeError>>,
73 },
74 DeleteSession {
75 session_id: SessionId,
76 reply: oneshot::Sender<Result<(), RuntimeError>>,
77 },
78 DispatchAction {
79 session_id: SessionId,
80 action: Box<Action>,
81 reply: oneshot::Sender<Result<(), RuntimeError>>,
82 },
83 SubscribeEvents {
84 session_id: SessionId,
85 reply: oneshot::Sender<Result<SessionEventSubscription, RuntimeError>>,
86 },
87 SubscribeDeltas {
88 session_id: SessionId,
89 reply: oneshot::Sender<Result<broadcast::Receiver<StreamDelta>, RuntimeError>>,
90 },
91 LoadEventsAfter {
92 session_id: SessionId,
93 after_seq: u64,
94 reply: oneshot::Sender<Result<Vec<(u64, SessionEvent)>, RuntimeError>>,
95 },
96 GetSessionState {
97 session_id: SessionId,
98 reply: oneshot::Sender<Result<AppState, RuntimeError>>,
99 },
100 IsSessionActive {
101 session_id: SessionId,
102 reply: oneshot::Sender<bool>,
103 },
104 ListActiveSessions {
105 reply: oneshot::Sender<Vec<SessionId>>,
106 },
107 ListAllSessions {
108 reply: oneshot::Sender<Result<Vec<SessionId>, RuntimeError>>,
109 },
110 SessionExists {
111 session_id: SessionId,
112 reply: oneshot::Sender<Result<bool, RuntimeError>>,
113 },
114 Shutdown,
115}
116
117struct RuntimeSupervisor {
118 sessions: HashMap<SessionId, SessionActorHandle>,
119 event_store: Arc<dyn EventStore>,
120 api_client: Arc<ApiClient>,
121 tool_executor: Arc<ToolExecutor>,
122}
123
124impl RuntimeSupervisor {
125 fn new(
126 event_store: Arc<dyn EventStore>,
127 api_client: Arc<ApiClient>,
128 tool_executor: Arc<ToolExecutor>,
129 ) -> Self {
130 Self {
131 sessions: HashMap::new(),
132 event_store,
133 api_client,
134 tool_executor,
135 }
136 }
137
138 async fn run(mut self, mut cmd_rx: mpsc::Receiver<SupervisorCmd>) {
139 loop {
140 tokio::select! {
141 Some(cmd) = cmd_rx.recv() => {
142 match cmd {
143 SupervisorCmd::CreateSession { config, reply } => {
144 let result = self.create_session(*config).await;
145 let _ = reply.send(result);
146 }
147 SupervisorCmd::ResumeSession { session_id, reply } => {
148 let result = self.resume_session(session_id).await;
149 let _ = reply.send(result);
150 }
151 SupervisorCmd::SuspendSession { session_id, reply } => {
152 let result = self.suspend_session(session_id).await;
153 let _ = reply.send(result);
154 }
155 SupervisorCmd::DeleteSession { session_id, reply } => {
156 let result = self.delete_session(session_id).await;
157 let _ = reply.send(result);
158 }
159 SupervisorCmd::DispatchAction { session_id, action, reply } => {
160 let result = self.dispatch_action(session_id, *action).await;
161 let _ = reply.send(result);
162 }
163 SupervisorCmd::SubscribeEvents { session_id, reply } => {
164 let result = self.subscribe_events(session_id).await;
165 let _ = reply.send(result);
166 }
167 SupervisorCmd::SubscribeDeltas { session_id, reply } => {
168 let result = self.subscribe_deltas(session_id).await;
169 let _ = reply.send(result);
170 }
171 SupervisorCmd::LoadEventsAfter {
172 session_id,
173 after_seq,
174 reply,
175 } => {
176 let result = self
177 .event_store
178 .load_events_after(session_id, after_seq)
179 .await
180 .map_err(RuntimeError::from);
181 let _ = reply.send(result);
182 }
183 SupervisorCmd::GetSessionState { session_id, reply } => {
184 let result = self.get_session_state(session_id).await;
185 let _ = reply.send(result);
186 }
187 SupervisorCmd::IsSessionActive { session_id, reply } => {
188 let is_active = self.sessions.contains_key(&session_id);
189 let _ = reply.send(is_active);
190 }
191 SupervisorCmd::ListActiveSessions { reply } => {
192 let sessions: Vec<SessionId> = self.sessions.keys().copied().collect();
193 let _ = reply.send(sessions);
194 }
195 SupervisorCmd::ListAllSessions { reply } => {
196 let result = self.event_store.list_session_ids().await
197 .map_err(RuntimeError::from);
198 let _ = reply.send(result);
199 }
200 SupervisorCmd::SessionExists { session_id, reply } => {
201 let result = self.event_store.session_exists(session_id).await
202 .map_err(RuntimeError::from);
203 let _ = reply.send(result);
204 }
205 SupervisorCmd::Shutdown => {
206 self.shutdown_all().await;
207 break;
208 }
209 }
210 }
211 else => break,
212 }
213 }
214
215 tracing::info!("Runtime supervisor stopped");
216 }
217
218 async fn create_session(&mut self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
219 let session_id = SessionId::new();
220
221 self.event_store.create_session(session_id).await?;
222
223 let mut config = config;
224
225 if config.primary_agent_id.is_none() {
226 config.primary_agent_id = Some(default_primary_agent_id().to_string());
227 }
228 let mut config = resolve_effective_config(&config);
229
230 let system_context = self.resolve_system_context(&config).await;
231 if let Some(context) = &system_context {
232 config.system_prompt = Some(context.prompt.clone());
233 }
234
235 let session_created_event = SessionEvent::SessionCreated {
236 config: Box::new(config.clone()),
237 metadata: config.metadata.clone(),
238 parent_session_id: None,
239 };
240 self.event_store
241 .append(session_id, &session_created_event)
242 .await?;
243
244 let mut state = AppState::new(session_id);
245 state.apply_session_config(&config, config.primary_agent_id.clone(), true);
246 if let Some(system_context) = system_context {
247 state.cached_system_context = Some(system_context);
248 }
249
250 let handle = spawn_session_actor(
251 session_id,
252 state,
253 self.event_store.clone(),
254 self.api_client.clone(),
255 self.tool_executor.clone(),
256 );
257 self.sessions.insert(session_id, handle);
258
259 tracing::info!(session_id = %session_id, "Created session");
260
261 Ok(session_id)
262 }
263
264 async fn resume_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
265 if self.sessions.contains_key(&session_id) {
266 return Ok(());
267 }
268
269 if !self.event_store.session_exists(session_id).await? {
270 return Err(RuntimeError::SessionNotFound {
271 session_id: session_id.to_string(),
272 });
273 }
274
275 let events = self.event_store.load_events(session_id).await?;
276
277 let mut state = AppState::new(session_id);
278 for (_, event) in &events {
279 apply_event_to_state(&mut state, event);
280 }
281
282 if let Some(config) = state.session_config.clone() {
283 let mut resolved = resolve_effective_config(&config);
284 let system_context = self.resolve_system_context(&resolved).await;
285 if let Some(context) = &system_context {
286 resolved.system_prompt = Some(context.prompt.clone());
287 }
288 state.apply_session_config(&resolved, resolved.primary_agent_id.clone(), false);
289 state.cached_system_context = system_context;
290 }
291
292 let should_drain_queue = !state.has_active_operation() && !state.queued_work.is_empty();
293 let handle = spawn_session_actor(
294 session_id,
295 state,
296 self.event_store.clone(),
297 self.api_client.clone(),
298 self.tool_executor.clone(),
299 );
300
301 if should_drain_queue {
302 handle
303 .dispatch(Action::DrainQueuedWork { session_id })
304 .await?;
305 }
306
307 self.sessions.insert(session_id, handle);
308
309 tracing::info!(
310 session_id = %session_id,
311 event_count = events.len(),
312 "Resumed session"
313 );
314
315 Ok(())
316 }
317
318 async fn resolve_system_context(
319 &self,
320 config: &SessionConfig,
321 ) -> Option<crate::app::SystemContext> {
322 let prompt = config
323 .system_prompt
324 .as_ref()
325 .and_then(|prompt| {
326 if prompt.trim().is_empty() {
327 None
328 } else {
329 Some(prompt.clone())
330 }
331 })
332 .unwrap_or_else(|| system_prompt_for_model(&config.default_model));
333
334 let workspace = match self.tool_executor.workspace() {
335 Some(workspace) => workspace,
336 None => return Some(crate::app::SystemContext::new(prompt)),
337 };
338
339 let environment = match workspace.environment().await {
340 Ok(env_info) => Some(env_info),
341 Err(error) => {
342 warn!(error = %error, "Failed to collect environment info for system context");
343 None
344 }
345 };
346
347 Some(crate::app::SystemContext::with_environment(
348 prompt,
349 environment,
350 ))
351 }
352
353 async fn suspend_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
354 if let Some(handle) = self.sessions.remove(&session_id) {
355 let _ = handle.suspend().await;
356 tracing::info!(session_id = %session_id, "Suspended session");
357 }
358 Ok(())
359 }
360
361 async fn delete_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
362 if let Some(handle) = self.sessions.remove(&session_id) {
363 handle.shutdown();
364 }
365
366 self.event_store.delete_session(session_id).await?;
367
368 tracing::info!(session_id = %session_id, "Deleted session");
369
370 Ok(())
371 }
372
373 async fn dispatch_action(
374 &mut self,
375 session_id: SessionId,
376 action: Action,
377 ) -> Result<(), RuntimeError> {
378 if !self.sessions.contains_key(&session_id) {
379 self.resume_session(session_id).await?;
380 }
381
382 let handle =
383 self.sessions
384 .get(&session_id)
385 .ok_or_else(|| RuntimeError::SessionNotFound {
386 session_id: session_id.to_string(),
387 })?;
388
389 handle.dispatch(action).await?;
390
391 Ok(())
392 }
393
394 async fn subscribe_events(
395 &mut self,
396 session_id: SessionId,
397 ) -> Result<SessionEventSubscription, RuntimeError> {
398 if !self.sessions.contains_key(&session_id) {
399 self.resume_session(session_id).await?;
400 }
401
402 let handle =
403 self.sessions
404 .get(&session_id)
405 .ok_or_else(|| RuntimeError::SessionNotFound {
406 session_id: session_id.to_string(),
407 })?;
408
409 let subscription = handle.subscribe().await?;
410
411 Ok(subscription)
412 }
413
414 async fn subscribe_deltas(
415 &mut self,
416 session_id: SessionId,
417 ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
418 if !self.sessions.contains_key(&session_id) {
419 self.resume_session(session_id).await?;
420 }
421
422 let handle =
423 self.sessions
424 .get(&session_id)
425 .ok_or_else(|| RuntimeError::SessionNotFound {
426 session_id: session_id.to_string(),
427 })?;
428
429 let delta_rx = handle.subscribe_deltas().await?;
430
431 Ok(delta_rx)
432 }
433
434 async fn get_session_state(&mut self, session_id: SessionId) -> Result<AppState, RuntimeError> {
435 if !self.sessions.contains_key(&session_id) {
436 self.resume_session(session_id).await?;
437 }
438
439 let handle =
440 self.sessions
441 .get(&session_id)
442 .ok_or_else(|| RuntimeError::SessionNotFound {
443 session_id: session_id.to_string(),
444 })?;
445
446 let state = handle.get_state().await?;
447
448 Ok(state)
449 }
450
451 async fn shutdown_all(&mut self) {
452 for (session_id, handle) in self.sessions.drain() {
453 handle.shutdown();
454 tracing::debug!(session_id = %session_id, "Shutting down session");
455 }
456 }
457}
458
459#[derive(Clone)]
460pub struct RuntimeHandle {
461 tx: mpsc::Sender<SupervisorCmd>,
462}
463
464impl RuntimeHandle {
465 pub async fn create_session(&self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
466 let (reply_tx, reply_rx) = oneshot::channel();
467 self.tx
468 .send(SupervisorCmd::CreateSession {
469 config: Box::new(config),
470 reply: reply_tx,
471 })
472 .await
473 .map_err(|_| RuntimeError::ChannelClosed)?;
474 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
475 }
476
477 pub async fn resume_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
478 let (reply_tx, reply_rx) = oneshot::channel();
479 self.tx
480 .send(SupervisorCmd::ResumeSession {
481 session_id,
482 reply: reply_tx,
483 })
484 .await
485 .map_err(|_| RuntimeError::ChannelClosed)?;
486 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
487 }
488
489 pub async fn suspend_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
490 let (reply_tx, reply_rx) = oneshot::channel();
491 self.tx
492 .send(SupervisorCmd::SuspendSession {
493 session_id,
494 reply: reply_tx,
495 })
496 .await
497 .map_err(|_| RuntimeError::ChannelClosed)?;
498 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
499 }
500
501 pub async fn delete_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
502 let (reply_tx, reply_rx) = oneshot::channel();
503 self.tx
504 .send(SupervisorCmd::DeleteSession {
505 session_id,
506 reply: reply_tx,
507 })
508 .await
509 .map_err(|_| RuntimeError::ChannelClosed)?;
510 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
511 }
512
513 pub async fn dispatch_action(
514 &self,
515 session_id: SessionId,
516 action: Action,
517 ) -> Result<(), RuntimeError> {
518 let (reply_tx, reply_rx) = oneshot::channel();
519 self.tx
520 .send(SupervisorCmd::DispatchAction {
521 session_id,
522 action: Box::new(action),
523 reply: reply_tx,
524 })
525 .await
526 .map_err(|_| RuntimeError::ChannelClosed)?;
527 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
528 }
529
530 pub async fn subscribe_events(
531 &self,
532 session_id: SessionId,
533 ) -> Result<SessionEventSubscription, RuntimeError> {
534 let (reply_tx, reply_rx) = oneshot::channel();
535 self.tx
536 .send(SupervisorCmd::SubscribeEvents {
537 session_id,
538 reply: reply_tx,
539 })
540 .await
541 .map_err(|_| RuntimeError::ChannelClosed)?;
542 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
543 }
544
545 pub async fn subscribe_deltas(
546 &self,
547 session_id: SessionId,
548 ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
549 let (reply_tx, reply_rx) = oneshot::channel();
550 self.tx
551 .send(SupervisorCmd::SubscribeDeltas {
552 session_id,
553 reply: reply_tx,
554 })
555 .await
556 .map_err(|_| RuntimeError::ChannelClosed)?;
557 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
558 }
559
560 pub async fn load_events_after(
561 &self,
562 session_id: SessionId,
563 after_seq: u64,
564 ) -> Result<Vec<(u64, SessionEvent)>, RuntimeError> {
565 let (reply_tx, reply_rx) = oneshot::channel();
566 self.tx
567 .send(SupervisorCmd::LoadEventsAfter {
568 session_id,
569 after_seq,
570 reply: reply_tx,
571 })
572 .await
573 .map_err(|_| RuntimeError::ChannelClosed)?;
574 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
575 }
576
577 pub async fn get_session_state(&self, session_id: SessionId) -> Result<AppState, RuntimeError> {
578 let (reply_tx, reply_rx) = oneshot::channel();
579 self.tx
580 .send(SupervisorCmd::GetSessionState {
581 session_id,
582 reply: reply_tx,
583 })
584 .await
585 .map_err(|_| RuntimeError::ChannelClosed)?;
586 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
587 }
588
589 pub async fn is_session_active(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
590 let (reply_tx, reply_rx) = oneshot::channel();
591 self.tx
592 .send(SupervisorCmd::IsSessionActive {
593 session_id,
594 reply: reply_tx,
595 })
596 .await
597 .map_err(|_| RuntimeError::ChannelClosed)?;
598 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
599 }
600
601 pub async fn list_active_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
602 let (reply_tx, reply_rx) = oneshot::channel();
603 self.tx
604 .send(SupervisorCmd::ListActiveSessions { reply: reply_tx })
605 .await
606 .map_err(|_| RuntimeError::ChannelClosed)?;
607 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
608 }
609
610 pub async fn submit_user_input(
611 &self,
612 session_id: SessionId,
613 content: Vec<UserContent>,
614 model: ModelId,
615 ) -> Result<OpId, RuntimeError> {
616 let has_text = content
617 .iter()
618 .any(|item| matches!(item, UserContent::Text { text } if !text.trim().is_empty()));
619 let has_non_text = content
620 .iter()
621 .any(|item| !matches!(item, UserContent::Text { .. }));
622 if !has_text && !has_non_text {
623 return Err(RuntimeError::InvalidInput {
624 message: "Input text cannot be empty".to_string(),
625 });
626 }
627
628 let op_id = OpId::new();
629 let message_id = MessageId::new();
630 let timestamp = current_timestamp();
631
632 let action = Action::UserInput {
633 session_id,
634 content,
635 op_id,
636 message_id,
637 model,
638 timestamp,
639 };
640
641 self.dispatch_action(session_id, action).await?;
642
643 Ok(op_id)
644 }
645
646 pub async fn submit_tool_approval(
647 &self,
648 session_id: SessionId,
649 request_id: RequestId,
650 approved: bool,
651 remember: Option<crate::app::domain::action::ApprovalMemory>,
652 ) -> Result<(), RuntimeError> {
653 use crate::app::domain::action::ApprovalDecision;
654
655 let decision = if approved {
656 ApprovalDecision::Approved
657 } else {
658 ApprovalDecision::Denied
659 };
660
661 let action = Action::ToolApprovalDecided {
662 session_id,
663 request_id,
664 decision,
665 remember,
666 };
667
668 self.dispatch_action(session_id, action).await
669 }
670
671 pub async fn switch_primary_agent(
672 &self,
673 session_id: SessionId,
674 agent_id: String,
675 ) -> Result<(), RuntimeError> {
676 let action = Action::SwitchPrimaryAgent {
677 session_id,
678 agent_id,
679 };
680 self.dispatch_action(session_id, action).await
681 }
682
683 pub async fn cancel_operation(
684 &self,
685 session_id: SessionId,
686 op_id: Option<OpId>,
687 ) -> Result<(), RuntimeError> {
688 let action = Action::Cancel { session_id, op_id };
689 self.dispatch_action(session_id, action).await
690 }
691
692 pub async fn submit_edited_message(
693 &self,
694 session_id: SessionId,
695 original_message_id: String,
696 new_content: Vec<UserContent>,
697 model: ModelId,
698 ) -> Result<OpId, RuntimeError> {
699 let op_id = OpId::new();
700 let new_message_id = MessageId::new();
701 let timestamp = current_timestamp();
702
703 let action = Action::UserEditedMessage {
704 session_id,
705 message_id: MessageId::from_string(original_message_id),
706 new_content,
707 op_id,
708 new_message_id,
709 model,
710 timestamp,
711 };
712
713 self.dispatch_action(session_id, action).await?;
714 Ok(op_id)
715 }
716
717 pub async fn submit_dequeue_queued_item(
718 &self,
719 session_id: SessionId,
720 ) -> Result<(), RuntimeError> {
721 let state = self.get_session_state(session_id).await?;
722 if state.queued_work.is_empty() {
723 return Err(RuntimeError::InvalidInput {
724 message: "No queued item to remove".to_string(),
725 });
726 }
727
728 let action = Action::DequeueQueuedItem { session_id };
729 self.dispatch_action(session_id, action).await
730 }
731
732 pub async fn compact_session(
733 &self,
734 session_id: SessionId,
735 model: ModelId,
736 ) -> Result<OpId, RuntimeError> {
737 let op_id = OpId::new();
738
739 let action = Action::RequestCompaction {
740 session_id,
741 op_id,
742 model,
743 };
744
745 self.dispatch_action(session_id, action).await?;
746 Ok(op_id)
747 }
748
749 pub async fn execute_bash_command(
750 &self,
751 session_id: SessionId,
752 command: String,
753 ) -> Result<OpId, RuntimeError> {
754 let op_id = OpId::new();
755 let message_id = MessageId::new();
756 let timestamp = current_timestamp();
757
758 let action = Action::DirectBashCommand {
759 session_id,
760 op_id,
761 message_id,
762 command,
763 timestamp,
764 };
765
766 self.dispatch_action(session_id, action).await?;
767 Ok(op_id)
768 }
769
770 pub async fn list_all_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
771 let (reply_tx, reply_rx) = oneshot::channel();
772 self.tx
773 .send(SupervisorCmd::ListAllSessions { reply: reply_tx })
774 .await
775 .map_err(|_| RuntimeError::ChannelClosed)?;
776 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
777 }
778
779 pub async fn session_exists(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
780 let (reply_tx, reply_rx) = oneshot::channel();
781 self.tx
782 .send(SupervisorCmd::SessionExists {
783 session_id,
784 reply: reply_tx,
785 })
786 .await
787 .map_err(|_| RuntimeError::ChannelClosed)?;
788 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
789 }
790
791 pub fn shutdown(&self) {
792 let _ = self.tx.try_send(SupervisorCmd::Shutdown);
793 }
794}
795
796pub struct RuntimeService {
797 pub handle: RuntimeHandle,
798 task: JoinHandle<()>,
799}
800
801impl RuntimeService {
802 pub fn spawn(
803 event_store: Arc<dyn EventStore>,
804 api_client: Arc<ApiClient>,
805 tool_executor: Arc<ToolExecutor>,
806 ) -> Self {
807 let (tx, rx) = mpsc::channel(64);
808
809 let supervisor = RuntimeSupervisor::new(event_store, api_client, tool_executor);
810 let task = tokio::spawn(supervisor.run(rx));
811
812 let handle = RuntimeHandle { tx };
813
814 Self { handle, task }
815 }
816
817 pub fn handle(&self) -> RuntimeHandle {
818 self.handle.clone()
819 }
820
821 pub async fn shutdown(self) {
822 self.handle.shutdown();
823 let _ = self.task.await;
824 }
825}
826
827fn current_timestamp() -> u64 {
828 std::time::SystemTime::now()
829 .duration_since(std::time::UNIX_EPOCH)
830 .unwrap_or_default()
831 .as_secs()
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use crate::app::domain::session::event_store::InMemoryEventStore;
838 use crate::app::validation::ValidatorRegistry;
839 use crate::tools::BackendRegistry;
840
841 async fn create_test_deps() -> (Arc<dyn EventStore>, Arc<ApiClient>, Arc<ToolExecutor>) {
842 let event_store = Arc::new(InMemoryEventStore::new());
843 let model_registry = Arc::new(crate::model_registry::ModelRegistry::load(&[]).unwrap());
844 let provider_registry = Arc::new(crate::auth::ProviderRegistry::load(&[]).unwrap());
845 let api_client = Arc::new(ApiClient::new_with_deps(
846 crate::test_utils::test_llm_config_provider().unwrap(),
847 provider_registry,
848 model_registry,
849 ));
850
851 let tool_executor = Arc::new(ToolExecutor::with_components(
852 Arc::new(BackendRegistry::new()),
853 Arc::new(ValidatorRegistry::new()),
854 ));
855
856 (event_store, api_client, tool_executor)
857 }
858
859 fn test_session_config() -> SessionConfig {
860 SessionConfig {
861 default_model: crate::config::model::builtin::claude_sonnet_4_5(),
862 workspace: crate::session::state::WorkspaceConfig::Local {
863 path: std::env::current_dir().unwrap(),
864 },
865 workspace_ref: None,
866 workspace_id: None,
867 repo_ref: None,
868 parent_session_id: None,
869 workspace_name: None,
870 tool_config: crate::session::state::SessionToolConfig::default(),
871 system_prompt: None,
872 primary_agent_id: None,
873 policy_overrides: crate::session::state::SessionPolicyOverrides::empty(),
874 metadata: std::collections::HashMap::new(),
875 }
876 }
877
878 #[tokio::test]
879 async fn test_create_session() {
880 let (event_store, api_client, tool_executor) = create_test_deps().await;
881 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
882
883 let session_id = service
884 .handle
885 .create_session(test_session_config())
886 .await
887 .unwrap();
888
889 assert!(service.handle.is_session_active(session_id).await.unwrap());
890
891 service.shutdown().await;
892 }
893
894 #[tokio::test]
895 async fn test_suspend_and_resume_session() {
896 let (event_store, api_client, tool_executor) = create_test_deps().await;
897 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
898
899 let session_id = service
900 .handle
901 .create_session(test_session_config())
902 .await
903 .unwrap();
904
905 service.handle.suspend_session(session_id).await.unwrap();
906 assert!(!service.handle.is_session_active(session_id).await.unwrap());
907
908 service.handle.resume_session(session_id).await.unwrap();
909 assert!(service.handle.is_session_active(session_id).await.unwrap());
910
911 service.shutdown().await;
912 }
913
914 #[tokio::test]
915 async fn test_delete_session() {
916 let (event_store, api_client, tool_executor) = create_test_deps().await;
917 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
918
919 let session_id = service
920 .handle
921 .create_session(test_session_config())
922 .await
923 .unwrap();
924
925 service.handle.delete_session(session_id).await.unwrap();
926 assert!(!service.handle.is_session_active(session_id).await.unwrap());
927
928 let result = service.handle.resume_session(session_id).await;
929 assert!(matches!(result, Err(RuntimeError::SessionNotFound { .. })));
930
931 service.shutdown().await;
932 }
933
934 #[tokio::test]
935 async fn test_subscribe_events() {
936 let (event_store, api_client, tool_executor) = create_test_deps().await;
937 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
938
939 let session_id = service
940 .handle
941 .create_session(test_session_config())
942 .await
943 .unwrap();
944
945 let _subscription = service.handle.subscribe_events(session_id).await.unwrap();
946
947 service.shutdown().await;
948 }
949}