1use std::collections::HashMap;
2use std::sync::Arc;
3
4use thiserror::Error;
5use tokio::sync::{broadcast, mpsc, oneshot};
6use tokio::task::JoinHandle;
7
8use crate::api::Client as ApiClient;
9use crate::app::domain::action::Action;
10use crate::app::domain::delta::StreamDelta;
11use crate::app::domain::event::SessionEvent;
12use crate::app::domain::reduce::apply_event_to_state;
13use crate::app::domain::session::EventStore;
14use crate::app::domain::state::AppState;
15use crate::app::domain::types::{MessageId, NonEmptyString, OpId, RequestId, SessionId};
16use crate::config::model::ModelId;
17use crate::primary_agents::{default_primary_agent_id, resolve_effective_config};
18use crate::prompts::system_prompt_for_model;
19use crate::session::state::SessionConfig;
20use crate::tools::ToolExecutor;
21use tracing::warn;
22
23use super::session_actor::{SessionActorHandle, SessionError, spawn_session_actor};
24use super::subscription::SessionEventSubscription;
25
26#[derive(Debug, Error)]
27pub enum RuntimeError {
28 #[error("Session not found: {session_id}")]
29 SessionNotFound { session_id: String },
30
31 #[error("Session already exists: {session_id}")]
32 SessionAlreadyExists { session_id: String },
33
34 #[error("Session error: {0}")]
35 Session(SessionError),
36
37 #[error("Event store error: {0}")]
38 EventStore(#[from] crate::app::domain::session::EventStoreError),
39
40 #[error("Channel closed")]
41 ChannelClosed,
42
43 #[error("Invalid input: {message}")]
44 InvalidInput { message: String },
45
46 #[error("Supervisor shutting down")]
47 ShuttingDown,
48}
49
50impl From<SessionError> for RuntimeError {
51 fn from(error: SessionError) -> Self {
52 match error {
53 SessionError::InvalidInput { message, .. } => RuntimeError::InvalidInput { message },
54 other => RuntimeError::Session(other),
55 }
56 }
57}
58
59pub(crate) enum SupervisorCmd {
60 CreateSession {
61 config: Box<SessionConfig>,
62 reply: oneshot::Sender<Result<SessionId, RuntimeError>>,
63 },
64 ResumeSession {
65 session_id: SessionId,
66 reply: oneshot::Sender<Result<(), RuntimeError>>,
67 },
68 SuspendSession {
69 session_id: SessionId,
70 reply: oneshot::Sender<Result<(), RuntimeError>>,
71 },
72 DeleteSession {
73 session_id: SessionId,
74 reply: oneshot::Sender<Result<(), RuntimeError>>,
75 },
76 DispatchAction {
77 session_id: SessionId,
78 action: Box<Action>,
79 reply: oneshot::Sender<Result<(), RuntimeError>>,
80 },
81 SubscribeEvents {
82 session_id: SessionId,
83 reply: oneshot::Sender<Result<SessionEventSubscription, RuntimeError>>,
84 },
85 SubscribeDeltas {
86 session_id: SessionId,
87 reply: oneshot::Sender<Result<broadcast::Receiver<StreamDelta>, RuntimeError>>,
88 },
89 LoadEventsAfter {
90 session_id: SessionId,
91 after_seq: u64,
92 reply: oneshot::Sender<Result<Vec<(u64, SessionEvent)>, RuntimeError>>,
93 },
94 GetSessionState {
95 session_id: SessionId,
96 reply: oneshot::Sender<Result<AppState, RuntimeError>>,
97 },
98 IsSessionActive {
99 session_id: SessionId,
100 reply: oneshot::Sender<bool>,
101 },
102 ListActiveSessions {
103 reply: oneshot::Sender<Vec<SessionId>>,
104 },
105 ListAllSessions {
106 reply: oneshot::Sender<Result<Vec<SessionId>, RuntimeError>>,
107 },
108 SessionExists {
109 session_id: SessionId,
110 reply: oneshot::Sender<Result<bool, RuntimeError>>,
111 },
112 Shutdown,
113}
114
115struct RuntimeSupervisor {
116 sessions: HashMap<SessionId, SessionActorHandle>,
117 event_store: Arc<dyn EventStore>,
118 api_client: Arc<ApiClient>,
119 tool_executor: Arc<ToolExecutor>,
120}
121
122impl RuntimeSupervisor {
123 fn new(
124 event_store: Arc<dyn EventStore>,
125 api_client: Arc<ApiClient>,
126 tool_executor: Arc<ToolExecutor>,
127 ) -> Self {
128 Self {
129 sessions: HashMap::new(),
130 event_store,
131 api_client,
132 tool_executor,
133 }
134 }
135
136 async fn run(mut self, mut cmd_rx: mpsc::Receiver<SupervisorCmd>) {
137 loop {
138 tokio::select! {
139 Some(cmd) = cmd_rx.recv() => {
140 match cmd {
141 SupervisorCmd::CreateSession { config, reply } => {
142 let result = self.create_session(*config).await;
143 let _ = reply.send(result);
144 }
145 SupervisorCmd::ResumeSession { session_id, reply } => {
146 let result = self.resume_session(session_id).await;
147 let _ = reply.send(result);
148 }
149 SupervisorCmd::SuspendSession { session_id, reply } => {
150 let result = self.suspend_session(session_id).await;
151 let _ = reply.send(result);
152 }
153 SupervisorCmd::DeleteSession { session_id, reply } => {
154 let result = self.delete_session(session_id).await;
155 let _ = reply.send(result);
156 }
157 SupervisorCmd::DispatchAction { session_id, action, reply } => {
158 let result = self.dispatch_action(session_id, *action).await;
159 let _ = reply.send(result);
160 }
161 SupervisorCmd::SubscribeEvents { session_id, reply } => {
162 let result = self.subscribe_events(session_id).await;
163 let _ = reply.send(result);
164 }
165 SupervisorCmd::SubscribeDeltas { session_id, reply } => {
166 let result = self.subscribe_deltas(session_id).await;
167 let _ = reply.send(result);
168 }
169 SupervisorCmd::LoadEventsAfter {
170 session_id,
171 after_seq,
172 reply,
173 } => {
174 let result = self
175 .event_store
176 .load_events_after(session_id, after_seq)
177 .await
178 .map_err(RuntimeError::from);
179 let _ = reply.send(result);
180 }
181 SupervisorCmd::GetSessionState { session_id, reply } => {
182 let result = self.get_session_state(session_id).await;
183 let _ = reply.send(result);
184 }
185 SupervisorCmd::IsSessionActive { session_id, reply } => {
186 let is_active = self.sessions.contains_key(&session_id);
187 let _ = reply.send(is_active);
188 }
189 SupervisorCmd::ListActiveSessions { reply } => {
190 let sessions: Vec<SessionId> = self.sessions.keys().copied().collect();
191 let _ = reply.send(sessions);
192 }
193 SupervisorCmd::ListAllSessions { reply } => {
194 let result = self.event_store.list_session_ids().await
195 .map_err(RuntimeError::from);
196 let _ = reply.send(result);
197 }
198 SupervisorCmd::SessionExists { session_id, reply } => {
199 let result = self.event_store.session_exists(session_id).await
200 .map_err(RuntimeError::from);
201 let _ = reply.send(result);
202 }
203 SupervisorCmd::Shutdown => {
204 self.shutdown_all().await;
205 break;
206 }
207 }
208 }
209 else => break,
210 }
211 }
212
213 tracing::info!("Runtime supervisor stopped");
214 }
215
216 async fn create_session(&mut self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
217 let session_id = SessionId::new();
218
219 self.event_store.create_session(session_id).await?;
220
221 let mut config = config;
222
223 if config.primary_agent_id.is_none() {
224 config.primary_agent_id = Some(default_primary_agent_id().to_string());
225 }
226 let mut config = resolve_effective_config(&config);
227
228 let system_context = self.resolve_system_context(&config).await;
229 if let Some(context) = &system_context {
230 config.system_prompt = Some(context.prompt.clone());
231 }
232
233 let session_created_event = SessionEvent::SessionCreated {
234 config: Box::new(config.clone()),
235 metadata: config.metadata.clone(),
236 parent_session_id: None,
237 };
238 self.event_store
239 .append(session_id, &session_created_event)
240 .await?;
241
242 let mut state = AppState::new(session_id);
243 state.apply_session_config(&config, config.primary_agent_id.clone(), true);
244 if let Some(system_context) = system_context {
245 state.cached_system_context = Some(system_context);
246 }
247
248 let handle = spawn_session_actor(
249 session_id,
250 state,
251 self.event_store.clone(),
252 self.api_client.clone(),
253 self.tool_executor.clone(),
254 );
255 self.sessions.insert(session_id, handle);
256
257 tracing::info!(session_id = %session_id, "Created session");
258
259 Ok(session_id)
260 }
261
262 async fn resume_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
263 if self.sessions.contains_key(&session_id) {
264 return Ok(());
265 }
266
267 if !self.event_store.session_exists(session_id).await? {
268 return Err(RuntimeError::SessionNotFound {
269 session_id: session_id.to_string(),
270 });
271 }
272
273 let events = self.event_store.load_events(session_id).await?;
274
275 let mut state = AppState::new(session_id);
276 for (_, event) in &events {
277 apply_event_to_state(&mut state, event);
278 }
279
280 if let Some(config) = state.session_config.clone() {
281 let mut resolved = resolve_effective_config(&config);
282 let system_context = self.resolve_system_context(&resolved).await;
283 if let Some(context) = &system_context {
284 resolved.system_prompt = Some(context.prompt.clone());
285 }
286 state.apply_session_config(&resolved, resolved.primary_agent_id.clone(), false);
287 state.cached_system_context = system_context;
288 }
289
290 let should_drain_queue = !state.has_active_operation() && !state.queued_work.is_empty();
291 let handle = spawn_session_actor(
292 session_id,
293 state,
294 self.event_store.clone(),
295 self.api_client.clone(),
296 self.tool_executor.clone(),
297 );
298
299 if should_drain_queue {
300 handle
301 .dispatch(Action::DrainQueuedWork { session_id })
302 .await?;
303 }
304
305 self.sessions.insert(session_id, handle);
306
307 tracing::info!(
308 session_id = %session_id,
309 event_count = events.len(),
310 "Resumed session"
311 );
312
313 Ok(())
314 }
315
316 async fn resolve_system_context(
317 &self,
318 config: &SessionConfig,
319 ) -> Option<crate::app::SystemContext> {
320 let prompt = config
321 .system_prompt
322 .as_ref()
323 .and_then(|prompt| {
324 if prompt.trim().is_empty() {
325 None
326 } else {
327 Some(prompt.clone())
328 }
329 })
330 .unwrap_or_else(|| system_prompt_for_model(&config.default_model));
331
332 let workspace = match self.tool_executor.workspace() {
333 Some(workspace) => workspace,
334 None => return Some(crate::app::SystemContext::new(prompt)),
335 };
336
337 let environment = match workspace.environment().await {
338 Ok(env_info) => Some(env_info),
339 Err(error) => {
340 warn!(error = %error, "Failed to collect environment info for system context");
341 None
342 }
343 };
344
345 Some(crate::app::SystemContext::with_environment(
346 prompt,
347 environment,
348 ))
349 }
350
351 async fn suspend_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
352 if let Some(handle) = self.sessions.remove(&session_id) {
353 let _ = handle.suspend().await;
354 tracing::info!(session_id = %session_id, "Suspended session");
355 }
356 Ok(())
357 }
358
359 async fn delete_session(&mut self, session_id: SessionId) -> Result<(), RuntimeError> {
360 if let Some(handle) = self.sessions.remove(&session_id) {
361 handle.shutdown();
362 }
363
364 self.event_store.delete_session(session_id).await?;
365
366 tracing::info!(session_id = %session_id, "Deleted session");
367
368 Ok(())
369 }
370
371 async fn dispatch_action(
372 &mut self,
373 session_id: SessionId,
374 action: Action,
375 ) -> Result<(), RuntimeError> {
376 if !self.sessions.contains_key(&session_id) {
377 self.resume_session(session_id).await?;
378 }
379
380 let handle =
381 self.sessions
382 .get(&session_id)
383 .ok_or_else(|| RuntimeError::SessionNotFound {
384 session_id: session_id.to_string(),
385 })?;
386
387 handle.dispatch(action).await?;
388
389 Ok(())
390 }
391
392 async fn subscribe_events(
393 &mut self,
394 session_id: SessionId,
395 ) -> Result<SessionEventSubscription, RuntimeError> {
396 if !self.sessions.contains_key(&session_id) {
397 self.resume_session(session_id).await?;
398 }
399
400 let handle =
401 self.sessions
402 .get(&session_id)
403 .ok_or_else(|| RuntimeError::SessionNotFound {
404 session_id: session_id.to_string(),
405 })?;
406
407 let subscription = handle.subscribe().await?;
408
409 Ok(subscription)
410 }
411
412 async fn subscribe_deltas(
413 &mut self,
414 session_id: SessionId,
415 ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
416 if !self.sessions.contains_key(&session_id) {
417 self.resume_session(session_id).await?;
418 }
419
420 let handle =
421 self.sessions
422 .get(&session_id)
423 .ok_or_else(|| RuntimeError::SessionNotFound {
424 session_id: session_id.to_string(),
425 })?;
426
427 let delta_rx = handle.subscribe_deltas().await?;
428
429 Ok(delta_rx)
430 }
431
432 async fn get_session_state(&mut self, session_id: SessionId) -> Result<AppState, RuntimeError> {
433 if !self.sessions.contains_key(&session_id) {
434 self.resume_session(session_id).await?;
435 }
436
437 let handle =
438 self.sessions
439 .get(&session_id)
440 .ok_or_else(|| RuntimeError::SessionNotFound {
441 session_id: session_id.to_string(),
442 })?;
443
444 let state = handle.get_state().await?;
445
446 Ok(state)
447 }
448
449 async fn shutdown_all(&mut self) {
450 for (session_id, handle) in self.sessions.drain() {
451 handle.shutdown();
452 tracing::debug!(session_id = %session_id, "Shutting down session");
453 }
454 }
455}
456
457#[derive(Clone)]
458pub struct RuntimeHandle {
459 tx: mpsc::Sender<SupervisorCmd>,
460}
461
462impl RuntimeHandle {
463 pub async fn create_session(&self, config: SessionConfig) -> Result<SessionId, RuntimeError> {
464 let (reply_tx, reply_rx) = oneshot::channel();
465 self.tx
466 .send(SupervisorCmd::CreateSession {
467 config: Box::new(config),
468 reply: reply_tx,
469 })
470 .await
471 .map_err(|_| RuntimeError::ChannelClosed)?;
472 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
473 }
474
475 pub async fn resume_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
476 let (reply_tx, reply_rx) = oneshot::channel();
477 self.tx
478 .send(SupervisorCmd::ResumeSession {
479 session_id,
480 reply: reply_tx,
481 })
482 .await
483 .map_err(|_| RuntimeError::ChannelClosed)?;
484 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
485 }
486
487 pub async fn suspend_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
488 let (reply_tx, reply_rx) = oneshot::channel();
489 self.tx
490 .send(SupervisorCmd::SuspendSession {
491 session_id,
492 reply: reply_tx,
493 })
494 .await
495 .map_err(|_| RuntimeError::ChannelClosed)?;
496 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
497 }
498
499 pub async fn delete_session(&self, session_id: SessionId) -> Result<(), RuntimeError> {
500 let (reply_tx, reply_rx) = oneshot::channel();
501 self.tx
502 .send(SupervisorCmd::DeleteSession {
503 session_id,
504 reply: reply_tx,
505 })
506 .await
507 .map_err(|_| RuntimeError::ChannelClosed)?;
508 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
509 }
510
511 pub async fn dispatch_action(
512 &self,
513 session_id: SessionId,
514 action: Action,
515 ) -> Result<(), RuntimeError> {
516 let (reply_tx, reply_rx) = oneshot::channel();
517 self.tx
518 .send(SupervisorCmd::DispatchAction {
519 session_id,
520 action: Box::new(action),
521 reply: reply_tx,
522 })
523 .await
524 .map_err(|_| RuntimeError::ChannelClosed)?;
525 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
526 }
527
528 pub async fn subscribe_events(
529 &self,
530 session_id: SessionId,
531 ) -> Result<SessionEventSubscription, RuntimeError> {
532 let (reply_tx, reply_rx) = oneshot::channel();
533 self.tx
534 .send(SupervisorCmd::SubscribeEvents {
535 session_id,
536 reply: reply_tx,
537 })
538 .await
539 .map_err(|_| RuntimeError::ChannelClosed)?;
540 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
541 }
542
543 pub async fn subscribe_deltas(
544 &self,
545 session_id: SessionId,
546 ) -> Result<broadcast::Receiver<StreamDelta>, RuntimeError> {
547 let (reply_tx, reply_rx) = oneshot::channel();
548 self.tx
549 .send(SupervisorCmd::SubscribeDeltas {
550 session_id,
551 reply: reply_tx,
552 })
553 .await
554 .map_err(|_| RuntimeError::ChannelClosed)?;
555 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
556 }
557
558 pub async fn load_events_after(
559 &self,
560 session_id: SessionId,
561 after_seq: u64,
562 ) -> Result<Vec<(u64, SessionEvent)>, RuntimeError> {
563 let (reply_tx, reply_rx) = oneshot::channel();
564 self.tx
565 .send(SupervisorCmd::LoadEventsAfter {
566 session_id,
567 after_seq,
568 reply: reply_tx,
569 })
570 .await
571 .map_err(|_| RuntimeError::ChannelClosed)?;
572 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
573 }
574
575 pub async fn get_session_state(&self, session_id: SessionId) -> Result<AppState, RuntimeError> {
576 let (reply_tx, reply_rx) = oneshot::channel();
577 self.tx
578 .send(SupervisorCmd::GetSessionState {
579 session_id,
580 reply: reply_tx,
581 })
582 .await
583 .map_err(|_| RuntimeError::ChannelClosed)?;
584 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
585 }
586
587 pub async fn is_session_active(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
588 let (reply_tx, reply_rx) = oneshot::channel();
589 self.tx
590 .send(SupervisorCmd::IsSessionActive {
591 session_id,
592 reply: reply_tx,
593 })
594 .await
595 .map_err(|_| RuntimeError::ChannelClosed)?;
596 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
597 }
598
599 pub async fn list_active_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
600 let (reply_tx, reply_rx) = oneshot::channel();
601 self.tx
602 .send(SupervisorCmd::ListActiveSessions { reply: reply_tx })
603 .await
604 .map_err(|_| RuntimeError::ChannelClosed)?;
605 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)
606 }
607
608 pub async fn submit_user_input(
609 &self,
610 session_id: SessionId,
611 text: String,
612 model: ModelId,
613 ) -> Result<OpId, RuntimeError> {
614 let text = NonEmptyString::new(text).ok_or_else(|| RuntimeError::InvalidInput {
615 message: "Input text cannot be empty".to_string(),
616 })?;
617
618 let op_id = OpId::new();
619 let message_id = MessageId::new();
620 let timestamp = current_timestamp();
621
622 let action = Action::UserInput {
623 session_id,
624 text,
625 op_id,
626 message_id,
627 model,
628 timestamp,
629 };
630
631 self.dispatch_action(session_id, action).await?;
632
633 Ok(op_id)
634 }
635
636 pub async fn submit_tool_approval(
637 &self,
638 session_id: SessionId,
639 request_id: RequestId,
640 approved: bool,
641 remember: Option<crate::app::domain::action::ApprovalMemory>,
642 ) -> Result<(), RuntimeError> {
643 use crate::app::domain::action::ApprovalDecision;
644
645 let decision = if approved {
646 ApprovalDecision::Approved
647 } else {
648 ApprovalDecision::Denied
649 };
650
651 let action = Action::ToolApprovalDecided {
652 session_id,
653 request_id,
654 decision,
655 remember,
656 };
657
658 self.dispatch_action(session_id, action).await
659 }
660
661 pub async fn switch_primary_agent(
662 &self,
663 session_id: SessionId,
664 agent_id: String,
665 ) -> Result<(), RuntimeError> {
666 let action = Action::SwitchPrimaryAgent {
667 session_id,
668 agent_id,
669 };
670 self.dispatch_action(session_id, action).await
671 }
672
673 pub async fn cancel_operation(
674 &self,
675 session_id: SessionId,
676 op_id: Option<OpId>,
677 ) -> Result<(), RuntimeError> {
678 let action = Action::Cancel { session_id, op_id };
679 self.dispatch_action(session_id, action).await
680 }
681
682 pub async fn submit_edited_message(
683 &self,
684 session_id: SessionId,
685 original_message_id: String,
686 new_content: String,
687 model: ModelId,
688 ) -> Result<OpId, RuntimeError> {
689 let op_id = OpId::new();
690 let new_message_id = MessageId::new();
691 let timestamp = current_timestamp();
692
693 let action = Action::UserEditedMessage {
694 session_id,
695 message_id: MessageId::from_string(original_message_id),
696 new_content,
697 op_id,
698 new_message_id,
699 model,
700 timestamp,
701 };
702
703 self.dispatch_action(session_id, action).await?;
704 Ok(op_id)
705 }
706
707 pub async fn submit_dequeue_queued_item(
708 &self,
709 session_id: SessionId,
710 ) -> Result<(), RuntimeError> {
711 let state = self.get_session_state(session_id).await?;
712 if state.queued_work.is_empty() {
713 return Err(RuntimeError::InvalidInput {
714 message: "No queued item to remove".to_string(),
715 });
716 }
717
718 let action = Action::DequeueQueuedItem { session_id };
719 self.dispatch_action(session_id, action).await
720 }
721
722 pub async fn compact_session(
723 &self,
724 session_id: SessionId,
725 model: ModelId,
726 ) -> Result<OpId, RuntimeError> {
727 let op_id = OpId::new();
728
729 let action = Action::RequestCompaction {
730 session_id,
731 op_id,
732 model,
733 };
734
735 self.dispatch_action(session_id, action).await?;
736 Ok(op_id)
737 }
738
739 pub async fn execute_bash_command(
740 &self,
741 session_id: SessionId,
742 command: String,
743 ) -> Result<OpId, RuntimeError> {
744 let op_id = OpId::new();
745 let message_id = MessageId::new();
746 let timestamp = current_timestamp();
747
748 let action = Action::DirectBashCommand {
749 session_id,
750 op_id,
751 message_id,
752 command,
753 timestamp,
754 };
755
756 self.dispatch_action(session_id, action).await?;
757 Ok(op_id)
758 }
759
760 pub async fn list_all_sessions(&self) -> Result<Vec<SessionId>, RuntimeError> {
761 let (reply_tx, reply_rx) = oneshot::channel();
762 self.tx
763 .send(SupervisorCmd::ListAllSessions { reply: reply_tx })
764 .await
765 .map_err(|_| RuntimeError::ChannelClosed)?;
766 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
767 }
768
769 pub async fn session_exists(&self, session_id: SessionId) -> Result<bool, RuntimeError> {
770 let (reply_tx, reply_rx) = oneshot::channel();
771 self.tx
772 .send(SupervisorCmd::SessionExists {
773 session_id,
774 reply: reply_tx,
775 })
776 .await
777 .map_err(|_| RuntimeError::ChannelClosed)?;
778 reply_rx.await.map_err(|_| RuntimeError::ChannelClosed)?
779 }
780
781 pub fn shutdown(&self) {
782 let _ = self.tx.try_send(SupervisorCmd::Shutdown);
783 }
784}
785
786pub struct RuntimeService {
787 pub handle: RuntimeHandle,
788 task: JoinHandle<()>,
789}
790
791impl RuntimeService {
792 pub fn spawn(
793 event_store: Arc<dyn EventStore>,
794 api_client: Arc<ApiClient>,
795 tool_executor: Arc<ToolExecutor>,
796 ) -> Self {
797 let (tx, rx) = mpsc::channel(64);
798
799 let supervisor = RuntimeSupervisor::new(event_store, api_client, tool_executor);
800 let task = tokio::spawn(supervisor.run(rx));
801
802 let handle = RuntimeHandle { tx };
803
804 Self { handle, task }
805 }
806
807 pub fn handle(&self) -> RuntimeHandle {
808 self.handle.clone()
809 }
810
811 pub async fn shutdown(self) {
812 self.handle.shutdown();
813 let _ = self.task.await;
814 }
815}
816
817fn current_timestamp() -> u64 {
818 std::time::SystemTime::now()
819 .duration_since(std::time::UNIX_EPOCH)
820 .unwrap_or_default()
821 .as_secs()
822}
823
824#[cfg(test)]
825mod tests {
826 use super::*;
827 use crate::app::domain::session::event_store::InMemoryEventStore;
828 use crate::app::validation::ValidatorRegistry;
829 use crate::tools::BackendRegistry;
830
831 async fn create_test_deps() -> (Arc<dyn EventStore>, Arc<ApiClient>, Arc<ToolExecutor>) {
832 let event_store = Arc::new(InMemoryEventStore::new());
833 let model_registry = Arc::new(crate::model_registry::ModelRegistry::load(&[]).unwrap());
834 let provider_registry = Arc::new(crate::auth::ProviderRegistry::load(&[]).unwrap());
835 let api_client = Arc::new(ApiClient::new_with_deps(
836 crate::test_utils::test_llm_config_provider().unwrap(),
837 provider_registry,
838 model_registry,
839 ));
840
841 let tool_executor = Arc::new(ToolExecutor::with_components(
842 Arc::new(BackendRegistry::new()),
843 Arc::new(ValidatorRegistry::new()),
844 ));
845
846 (event_store, api_client, tool_executor)
847 }
848
849 fn test_session_config() -> SessionConfig {
850 SessionConfig {
851 default_model: crate::config::model::builtin::claude_sonnet_4_5(),
852 workspace: crate::session::state::WorkspaceConfig::Local {
853 path: std::env::current_dir().unwrap(),
854 },
855 workspace_ref: None,
856 workspace_id: None,
857 repo_ref: None,
858 parent_session_id: None,
859 workspace_name: None,
860 tool_config: crate::session::state::SessionToolConfig::default(),
861 system_prompt: None,
862 primary_agent_id: None,
863 policy_overrides: crate::session::state::SessionPolicyOverrides::empty(),
864 metadata: std::collections::HashMap::new(),
865 }
866 }
867
868 #[tokio::test]
869 async fn test_create_session() {
870 let (event_store, api_client, tool_executor) = create_test_deps().await;
871 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
872
873 let session_id = service
874 .handle
875 .create_session(test_session_config())
876 .await
877 .unwrap();
878
879 assert!(service.handle.is_session_active(session_id).await.unwrap());
880
881 service.shutdown().await;
882 }
883
884 #[tokio::test]
885 async fn test_suspend_and_resume_session() {
886 let (event_store, api_client, tool_executor) = create_test_deps().await;
887 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
888
889 let session_id = service
890 .handle
891 .create_session(test_session_config())
892 .await
893 .unwrap();
894
895 service.handle.suspend_session(session_id).await.unwrap();
896 assert!(!service.handle.is_session_active(session_id).await.unwrap());
897
898 service.handle.resume_session(session_id).await.unwrap();
899 assert!(service.handle.is_session_active(session_id).await.unwrap());
900
901 service.shutdown().await;
902 }
903
904 #[tokio::test]
905 async fn test_delete_session() {
906 let (event_store, api_client, tool_executor) = create_test_deps().await;
907 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
908
909 let session_id = service
910 .handle
911 .create_session(test_session_config())
912 .await
913 .unwrap();
914
915 service.handle.delete_session(session_id).await.unwrap();
916 assert!(!service.handle.is_session_active(session_id).await.unwrap());
917
918 let result = service.handle.resume_session(session_id).await;
919 assert!(matches!(result, Err(RuntimeError::SessionNotFound { .. })));
920
921 service.shutdown().await;
922 }
923
924 #[tokio::test]
925 async fn test_subscribe_events() {
926 let (event_store, api_client, tool_executor) = create_test_deps().await;
927 let service = RuntimeService::spawn(event_store, api_client, tool_executor);
928
929 let session_id = service
930 .handle
931 .create_session(test_session_config())
932 .await
933 .unwrap();
934
935 let _subscription = service.handle.subscribe_events(session_id).await.unwrap();
936
937 service.shutdown().await;
938 }
939}