1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use futures_util::stream::{self, BoxStream, StreamExt};
7use lash_trace::{JsonlTraceSink, TraceContext, TraceLevel, TraceSink};
8use serde::{Deserialize, Serialize};
9use tokio::sync::{Mutex, broadcast};
10
11use crate::plugin::PluginError;
12
13use super::{SessionStoreFactory, TerminationPolicy};
14
15#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum BackgroundTaskKind {
19 Monitor,
20 Subagent,
21 Observer,
22 Other,
23}
24
25impl BackgroundTaskKind {
26 pub fn as_str(&self) -> &'static str {
27 match self {
28 BackgroundTaskKind::Monitor => "monitor",
29 BackgroundTaskKind::Subagent => "subagent",
30 BackgroundTaskKind::Observer => "observer",
31 BackgroundTaskKind::Other => "other",
32 }
33 }
34}
35
36pub type BackgroundTaskId = String;
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum BackgroundTaskState {
42 Pending,
43 Running,
44 Waiting,
45 Completed,
46 Failed,
47 CancelRequested,
48 Cancelled,
49}
50
51impl BackgroundTaskState {
52 pub fn is_terminal(self) -> bool {
53 matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
54 }
55}
56
57#[derive(Clone, Debug)]
59pub struct BackgroundTaskRegistration {
60 pub id: BackgroundTaskId,
61 pub kind: BackgroundTaskKind,
62 pub producer: &'static str,
63 pub child_session_id: Option<String>,
64 pub parent_task_id: Option<BackgroundTaskId>,
65}
66
67#[derive(Clone, Debug, Default, Serialize, Deserialize)]
68pub struct BackgroundTaskScope {
69 pub session_id: String,
70}
71
72#[derive(Clone, Debug, Default, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum BackgroundCancelPolicy {
75 #[default]
76 Cooperative,
77 AbortLocal,
78 External,
79}
80
81#[derive(Clone, Debug, Default, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum BackgroundClosePolicy {
84 #[default]
85 Keep,
86 Cancel,
87 Transfer,
88}
89
90#[derive(Clone, Debug, Default, Serialize, Deserialize)]
91pub struct BackgroundTaskAttempt {
92 pub attempt: u32,
93 pub max_attempts: Option<u32>,
94 pub idempotency_key: Option<String>,
95}
96
97#[derive(Clone, Debug, Default, Serialize, Deserialize)]
98pub struct BackgroundTaskOutcome {
99 pub summary: Option<String>,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct BackgroundTaskRecord {
105 pub id: BackgroundTaskId,
106 pub kind: BackgroundTaskKind,
107 pub producer: String,
108 pub scope: BackgroundTaskScope,
109 pub parent_task_id: Option<BackgroundTaskId>,
110 pub child_session_id: Option<String>,
111 pub state: BackgroundTaskState,
112 pub cancel_policy: BackgroundCancelPolicy,
113 pub close_policy: BackgroundClosePolicy,
114 pub attempt: BackgroundTaskAttempt,
115 pub result: Option<BackgroundTaskOutcome>,
116 pub failure: Option<BackgroundTaskOutcome>,
117 pub created_at: SystemTime,
118 pub updated_at: SystemTime,
119 pub completed_at: Option<SystemTime>,
120}
121
122impl BackgroundTaskRecord {
123 pub fn local_session(
124 session_id: impl Into<String>,
125 id: impl Into<BackgroundTaskId>,
126 kind: BackgroundTaskKind,
127 producer: impl Into<String>,
128 state: BackgroundTaskState,
129 ) -> Self {
130 let now = SystemTime::now();
131 Self {
132 id: id.into(),
133 kind,
134 producer: producer.into(),
135 scope: BackgroundTaskScope {
136 session_id: session_id.into(),
137 },
138 parent_task_id: None,
139 child_session_id: None,
140 state,
141 cancel_policy: BackgroundCancelPolicy::Cooperative,
142 close_policy: BackgroundClosePolicy::Keep,
143 attempt: BackgroundTaskAttempt::default(),
144 result: None,
145 failure: None,
146 created_at: now,
147 updated_at: now,
148 completed_at: state.is_terminal().then_some(now),
149 }
150 }
151}
152
153#[derive(Clone, Debug, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum BackgroundTaskEvent {
156 Registered {
157 record: BackgroundTaskRecord,
158 },
159 StateChanged {
160 task_id: BackgroundTaskId,
161 state: BackgroundTaskState,
162 },
163 Progress {
164 task_id: BackgroundTaskId,
165 message: String,
166 },
167 Completed {
168 record: BackgroundTaskRecord,
169 },
170 Failed {
171 record: BackgroundTaskRecord,
172 },
173 CancelRequested {
174 task_id: BackgroundTaskId,
175 reason: Option<String>,
176 },
177 Cancelled {
178 record: BackgroundTaskRecord,
179 },
180 Transferred {
181 task_id: BackgroundTaskId,
182 scope: BackgroundTaskScope,
183 },
184}
185
186#[derive(Clone, Debug, Default, Serialize, Deserialize)]
187pub struct BackgroundTaskFilter {
188 pub session_id: Option<String>,
189 pub kind: Option<BackgroundTaskKind>,
190 pub include_terminal: bool,
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
194pub struct BackgroundTaskRegisterRequest {
195 pub scope: BackgroundTaskScope,
196 pub id: BackgroundTaskId,
197 pub kind: BackgroundTaskKind,
198 pub producer: String,
199 pub parent_task_id: Option<BackgroundTaskId>,
200 pub child_session_id: Option<String>,
201 pub cancel_policy: BackgroundCancelPolicy,
202 pub close_policy: BackgroundClosePolicy,
203 pub attempt: BackgroundTaskAttempt,
204}
205
206#[derive(Clone, Debug, Default, Serialize, Deserialize)]
207pub struct BackgroundTaskUpdate {
208 pub state: Option<BackgroundTaskState>,
209 pub progress: Option<String>,
210}
211
212#[derive(Clone, Debug, Serialize, Deserialize)]
213pub struct BackgroundTaskCompletion {
214 pub state: BackgroundTaskState,
215 pub summary: Option<String>,
216}
217
218pub type LocalBackgroundTaskCancel =
220 Arc<dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
221
222#[async_trait::async_trait]
224pub trait BackgroundTaskHost: Send + Sync {
225 async fn register(
226 &self,
227 request: BackgroundTaskRegisterRequest,
228 ) -> Result<BackgroundTaskRecord, PluginError>;
229
230 async fn update(
231 &self,
232 task_id: &str,
233 update: BackgroundTaskUpdate,
234 ) -> Result<BackgroundTaskRecord, PluginError>;
235
236 async fn complete(
237 &self,
238 task_id: &str,
239 outcome: BackgroundTaskCompletion,
240 ) -> Result<BackgroundTaskRecord, PluginError>;
241
242 async fn request_cancel(
243 &self,
244 task_id: &str,
245 reason: Option<String>,
246 ) -> Result<BackgroundTaskRecord, PluginError>;
247
248 async fn get(&self, task_id: &str) -> Option<BackgroundTaskRecord>;
249
250 async fn list(&self, filter: BackgroundTaskFilter) -> Vec<BackgroundTaskRecord>;
251
252 async fn transfer(
253 &self,
254 task_id: &str,
255 new_scope: BackgroundTaskScope,
256 ) -> Result<BackgroundTaskRecord, PluginError>;
257
258 fn subscribe(&self, filter: BackgroundTaskFilter) -> BoxStream<'static, BackgroundTaskEvent>;
259
260 async fn spawn_hidden(
261 &self,
262 session_id: &str,
263 label: &str,
264 task: crate::plugin::PluginSessionTask,
265 ) -> Result<(), PluginError>;
266
267 async fn await_hidden(&self, session_id: &str) -> Result<(), PluginError>;
268
269 async fn spawn_managed(
272 &self,
273 session_id: &str,
274 spec: BackgroundTaskRegistration,
275 task: crate::plugin::PluginSessionTask,
276 ) -> Result<(), PluginError>;
277
278 async fn register_external(
283 &self,
284 session_id: &str,
285 spec: BackgroundTaskRegistration,
286 cancel: Option<LocalBackgroundTaskCancel>,
287 ) -> Result<(), PluginError>;
288
289 async fn unregister_external(&self, session_id: &str, task_id: &str);
290
291 async fn mark_terminal(&self, session_id: &str, task_id: &str, state: BackgroundTaskState);
294
295 async fn mark_live_state(&self, session_id: &str, task_id: &str, state: BackgroundTaskState);
300
301 async fn cancel_managed(&self, session_id: &str, task_id: &str) -> Result<(), PluginError>;
304
305 async fn list_managed(&self, session_id: &str) -> Vec<BackgroundTaskRecord>;
307
308 async fn get_managed(&self, session_id: &str, task_id: &str) -> Option<BackgroundTaskRecord>;
310
311 async fn transfer_managed(
313 &self,
314 from_session_id: &str,
315 to_session_id: &str,
316 task_ids: &[String],
317 ) -> Result<(), PluginError>;
318
319 async fn cancel_all_managed(
321 &self,
322 session_id: &str,
323 ) -> Result<Vec<BackgroundTaskRecord>, PluginError>;
324}
325
326pub struct LocalBackgroundTaskHost {
328 hidden: Mutex<HiddenTaskMap>,
329 managed: Arc<Mutex<ManagedTaskMap>>,
330 events: broadcast::Sender<BackgroundTaskEvent>,
331}
332
333impl Default for LocalBackgroundTaskHost {
334 fn default() -> Self {
335 let (events, _) = broadcast::channel(256);
336 Self {
337 hidden: Mutex::new(HashMap::new()),
338 managed: Arc::new(Mutex::new(HashMap::new())),
339 events,
340 }
341 }
342}
343
344type SessionTaskHandle = tokio::task::JoinHandle<Result<(), PluginError>>;
345type HiddenTaskMap = HashMap<String, Vec<SessionTaskHandle>>;
346type ManagedTaskMap = HashMap<String, HashMap<String, ManagedTaskRecord>>;
347
348struct ManagedTaskRecord {
349 status: BackgroundTaskRecord,
350 handle: Option<SessionTaskHandle>,
351 cancel: Option<LocalBackgroundTaskCancel>,
352}
353
354fn new_background_task_record(
355 scope_session_id: &str,
356 spec: &BackgroundTaskRegistration,
357 state: BackgroundTaskState,
358) -> BackgroundTaskRecord {
359 let mut record = BackgroundTaskRecord::local_session(
360 scope_session_id,
361 spec.id.clone(),
362 spec.kind,
363 spec.producer,
364 state,
365 );
366 record.parent_task_id = spec.parent_task_id.clone();
367 record.child_session_id = spec.child_session_id.clone();
368 record
369}
370
371fn event_matches_filter(event: &BackgroundTaskEvent, filter: &BackgroundTaskFilter) -> bool {
372 let record = match event {
373 BackgroundTaskEvent::Registered { record }
374 | BackgroundTaskEvent::Completed { record }
375 | BackgroundTaskEvent::Failed { record }
376 | BackgroundTaskEvent::Cancelled { record } => Some(record),
377 _ => None,
378 };
379 if let Some(record) = record {
380 if filter
381 .session_id
382 .as_ref()
383 .is_some_and(|session_id| &record.scope.session_id != session_id)
384 {
385 return false;
386 }
387 if filter.kind.is_some_and(|kind| record.kind != kind) {
388 return false;
389 }
390 return filter.include_terminal || !record.state.is_terminal();
391 }
392 true
393}
394
395fn record_matches_filter(record: &BackgroundTaskRecord, filter: &BackgroundTaskFilter) -> bool {
396 if filter
397 .session_id
398 .as_ref()
399 .is_some_and(|session_id| &record.scope.session_id != session_id)
400 {
401 return false;
402 }
403 if filter.kind.is_some_and(|kind| record.kind != kind) {
404 return false;
405 }
406 filter.include_terminal || !record.state.is_terminal()
407}
408
409impl LocalBackgroundTaskHost {
410 fn publish(&self, event: BackgroundTaskEvent) {
411 let _ = self.events.send(event);
412 }
413}
414
415#[async_trait::async_trait]
416impl BackgroundTaskHost for LocalBackgroundTaskHost {
417 async fn register(
418 &self,
419 request: BackgroundTaskRegisterRequest,
420 ) -> Result<BackgroundTaskRecord, PluginError> {
421 let mut managed = self.managed.lock().await;
422 let tasks = managed.entry(request.scope.session_id.clone()).or_default();
423 if tasks
424 .get(&request.id)
425 .is_some_and(|record| !record.status.state.is_terminal())
426 {
427 return Err(PluginError::Session(format!(
428 "background task `{}` is already registered",
429 request.id
430 )));
431 }
432 let now = SystemTime::now();
433 let record = BackgroundTaskRecord {
434 id: request.id.clone(),
435 kind: request.kind,
436 producer: request.producer,
437 scope: request.scope,
438 parent_task_id: request.parent_task_id,
439 child_session_id: request.child_session_id,
440 state: BackgroundTaskState::Pending,
441 cancel_policy: request.cancel_policy,
442 close_policy: request.close_policy,
443 attempt: request.attempt,
444 result: None,
445 failure: None,
446 created_at: now,
447 updated_at: now,
448 completed_at: None,
449 };
450 tasks.insert(
451 request.id,
452 ManagedTaskRecord {
453 status: record.clone(),
454 handle: None,
455 cancel: None,
456 },
457 );
458 drop(managed);
459 self.publish(BackgroundTaskEvent::Registered {
460 record: record.clone(),
461 });
462 Ok(record)
463 }
464
465 async fn update(
466 &self,
467 task_id: &str,
468 update: BackgroundTaskUpdate,
469 ) -> Result<BackgroundTaskRecord, PluginError> {
470 let mut managed = self.managed.lock().await;
471 for tasks in managed.values_mut() {
472 if let Some(record) = tasks.get_mut(task_id) {
473 if let Some(state) = update.state {
474 record.status.state = state;
475 record.status.updated_at = SystemTime::now();
476 }
477 let status = record.status.clone();
478 drop(managed);
479 if let Some(message) = update.progress {
480 self.publish(BackgroundTaskEvent::Progress {
481 task_id: task_id.to_string(),
482 message,
483 });
484 }
485 self.publish(BackgroundTaskEvent::StateChanged {
486 task_id: task_id.to_string(),
487 state: status.state,
488 });
489 return Ok(status);
490 }
491 }
492 Err(PluginError::Session(format!(
493 "unknown background task `{task_id}`"
494 )))
495 }
496
497 async fn complete(
498 &self,
499 task_id: &str,
500 outcome: BackgroundTaskCompletion,
501 ) -> Result<BackgroundTaskRecord, PluginError> {
502 if !outcome.state.is_terminal() {
503 return Err(PluginError::Session(
504 "background task completion must use a terminal state".to_string(),
505 ));
506 }
507 let mut managed = self.managed.lock().await;
508 for tasks in managed.values_mut() {
509 if let Some(record) = tasks.get_mut(task_id) {
510 record.status.state = outcome.state;
511 record.status.updated_at = SystemTime::now();
512 record.status.completed_at = Some(record.status.updated_at);
513 let summary = BackgroundTaskOutcome {
514 summary: outcome.summary,
515 };
516 if outcome.state == BackgroundTaskState::Failed {
517 record.status.failure = Some(summary);
518 } else {
519 record.status.result = Some(summary);
520 }
521 record.handle = None;
522 let status = record.status.clone();
523 drop(managed);
524 self.publish(match status.state {
525 BackgroundTaskState::Failed => BackgroundTaskEvent::Failed {
526 record: status.clone(),
527 },
528 BackgroundTaskState::Cancelled => BackgroundTaskEvent::Cancelled {
529 record: status.clone(),
530 },
531 _ => BackgroundTaskEvent::Completed {
532 record: status.clone(),
533 },
534 });
535 return Ok(status);
536 }
537 }
538 Err(PluginError::Session(format!(
539 "unknown background task `{task_id}`"
540 )))
541 }
542
543 async fn request_cancel(
544 &self,
545 task_id: &str,
546 reason: Option<String>,
547 ) -> Result<BackgroundTaskRecord, PluginError> {
548 let status = self
549 .update(
550 task_id,
551 BackgroundTaskUpdate {
552 state: Some(BackgroundTaskState::CancelRequested),
553 progress: None,
554 },
555 )
556 .await?;
557 self.publish(BackgroundTaskEvent::CancelRequested {
558 task_id: task_id.to_string(),
559 reason,
560 });
561 Ok(status)
562 }
563
564 async fn get(&self, task_id: &str) -> Option<BackgroundTaskRecord> {
565 let managed = self.managed.lock().await;
566 managed
567 .values()
568 .find_map(|tasks| tasks.get(task_id).map(|record| record.status.clone()))
569 }
570
571 async fn list(&self, filter: BackgroundTaskFilter) -> Vec<BackgroundTaskRecord> {
572 let managed = self.managed.lock().await;
573 let mut out = managed
574 .values()
575 .flat_map(|tasks| tasks.values())
576 .map(|record| record.status.clone())
577 .filter(|record| record_matches_filter(record, &filter))
578 .collect::<Vec<_>>();
579 out.sort_by_key(|record| record.created_at);
580 out
581 }
582
583 async fn transfer(
584 &self,
585 task_id: &str,
586 new_scope: BackgroundTaskScope,
587 ) -> Result<BackgroundTaskRecord, PluginError> {
588 let mut managed = self.managed.lock().await;
589 let mut moved = None;
590 for tasks in managed.values_mut() {
591 if let Some(record) = tasks.remove(task_id) {
592 moved = Some(record);
593 break;
594 }
595 }
596 let Some(mut record) = moved else {
597 return Err(PluginError::Session(format!(
598 "unknown background task `{task_id}`"
599 )));
600 };
601 record.status.scope = new_scope.clone();
602 record.status.updated_at = SystemTime::now();
603 let status = record.status.clone();
604 managed
605 .entry(new_scope.session_id.clone())
606 .or_default()
607 .insert(task_id.to_string(), record);
608 drop(managed);
609 self.publish(BackgroundTaskEvent::Transferred {
610 task_id: task_id.to_string(),
611 scope: new_scope,
612 });
613 Ok(status)
614 }
615
616 fn subscribe(&self, filter: BackgroundTaskFilter) -> BoxStream<'static, BackgroundTaskEvent> {
617 let rx = self.events.subscribe();
618 stream::unfold((rx, filter), |(mut rx, filter)| async move {
619 loop {
620 match rx.recv().await {
621 Ok(event) if event_matches_filter(&event, &filter) => {
622 return Some((event, (rx, filter)));
623 }
624 Ok(_) => continue,
625 Err(broadcast::error::RecvError::Lagged(_)) => continue,
626 Err(broadcast::error::RecvError::Closed) => return None,
627 }
628 }
629 })
630 .boxed()
631 }
632
633 async fn spawn_hidden(
634 &self,
635 session_id: &str,
636 _label: &str,
637 task: crate::plugin::PluginSessionTask,
638 ) -> Result<(), PluginError> {
639 let handle = tokio::spawn(task);
640 self.hidden
641 .lock()
642 .await
643 .entry(session_id.to_string())
644 .or_default()
645 .push(handle);
646 Ok(())
647 }
648
649 async fn await_hidden(&self, session_id: &str) -> Result<(), PluginError> {
650 loop {
651 let tasks = self
652 .hidden
653 .lock()
654 .await
655 .remove(session_id)
656 .unwrap_or_default();
657 if tasks.is_empty() {
658 return Ok(());
659 }
660 for task in tasks {
661 match task.await {
662 Ok(Ok(())) => {}
663 Ok(Err(err)) => return Err(err),
664 Err(err) => {
665 return Err(PluginError::Session(format!(
666 "hidden background task failed: {err}"
667 )));
668 }
669 }
670 }
671 }
672 }
673
674 async fn spawn_managed(
675 &self,
676 session_id: &str,
677 spec: BackgroundTaskRegistration,
678 task: crate::plugin::PluginSessionTask,
679 ) -> Result<(), PluginError> {
680 let mut managed = self.managed.lock().await;
681 let tasks = managed.entry(session_id.to_string()).or_default();
682 if tasks
683 .get(&spec.id)
684 .is_some_and(|record| !record.status.state.is_terminal())
685 {
686 return Err(PluginError::Session(format!(
687 "managed session task `{}` is already running",
688 spec.id
689 )));
690 }
691 let records = Arc::clone(&self.managed);
692 let session_key = session_id.to_string();
693 let task_id = spec.id.clone();
694 let handle = tokio::spawn(async move {
695 let result = task.await;
696 let terminal = match &result {
697 Ok(()) => BackgroundTaskState::Completed,
698 Err(_) => BackgroundTaskState::Failed,
699 };
700 if let Some(record) = records
701 .lock()
702 .await
703 .get_mut(&session_key)
704 .and_then(|tasks| tasks.get_mut(&task_id))
705 && !record.status.state.is_terminal()
706 {
707 record.status.state = terminal;
708 record.status.updated_at = SystemTime::now();
709 record.status.completed_at = Some(record.status.updated_at);
710 record.handle = None;
711 }
712 result
713 });
714 let record = ManagedTaskRecord {
715 status: new_background_task_record(session_id, &spec, BackgroundTaskState::Running),
716 handle: Some(handle),
717 cancel: None,
718 };
719 let status = record.status.clone();
720 tasks.insert(spec.id, record);
721 drop(managed);
722 self.publish(BackgroundTaskEvent::Registered { record: status });
723 Ok(())
724 }
725
726 async fn register_external(
727 &self,
728 session_id: &str,
729 spec: BackgroundTaskRegistration,
730 cancel: Option<LocalBackgroundTaskCancel>,
731 ) -> Result<(), PluginError> {
732 let mut managed = self.managed.lock().await;
733 let tasks = managed.entry(session_id.to_string()).or_default();
734 if tasks
735 .get(&spec.id)
736 .is_some_and(|record| !record.status.state.is_terminal())
737 {
738 return Err(PluginError::Session(format!(
739 "background task `{}` is already registered",
740 spec.id
741 )));
742 }
743 let record = ManagedTaskRecord {
744 status: new_background_task_record(session_id, &spec, BackgroundTaskState::Running),
745 handle: None,
746 cancel,
747 };
748 let status = record.status.clone();
749 tasks.insert(spec.id, record);
750 drop(managed);
751 self.publish(BackgroundTaskEvent::Registered { record: status });
752 Ok(())
753 }
754
755 async fn unregister_external(&self, session_id: &str, task_id: &str) {
756 let mut managed = self.managed.lock().await;
757 if let Some(tasks) = managed.get_mut(session_id) {
758 tasks.remove(task_id);
759 if tasks.is_empty() {
760 managed.remove(session_id);
761 }
762 }
763 }
764
765 async fn mark_terminal(&self, session_id: &str, task_id: &str, state: BackgroundTaskState) {
766 if !state.is_terminal() {
767 return;
768 }
769 let mut event = None;
770 let mut managed = self.managed.lock().await;
771 if let Some(record) = managed
772 .get_mut(session_id)
773 .and_then(|tasks| tasks.get_mut(task_id))
774 && !record.status.state.is_terminal()
775 {
776 record.status.state = state;
777 record.status.updated_at = SystemTime::now();
778 record.status.completed_at = Some(record.status.updated_at);
779 record.handle = None;
780 event = Some(match state {
781 BackgroundTaskState::Failed => BackgroundTaskEvent::Failed {
782 record: record.status.clone(),
783 },
784 BackgroundTaskState::Cancelled => BackgroundTaskEvent::Cancelled {
785 record: record.status.clone(),
786 },
787 _ => BackgroundTaskEvent::Completed {
788 record: record.status.clone(),
789 },
790 });
791 }
792 drop(managed);
793 if let Some(event) = event {
794 self.publish(event);
795 }
796 }
797
798 async fn mark_live_state(&self, session_id: &str, task_id: &str, state: BackgroundTaskState) {
799 if !matches!(
800 state,
801 BackgroundTaskState::Running | BackgroundTaskState::Waiting
802 ) {
803 return;
804 }
805 let mut event = None;
806 let mut managed = self.managed.lock().await;
807 if let Some(record) = managed
808 .get_mut(session_id)
809 .and_then(|tasks| tasks.get_mut(task_id))
810 && !record.status.state.is_terminal()
811 {
812 record.status.state = state;
813 record.status.updated_at = SystemTime::now();
814 event = Some(BackgroundTaskEvent::StateChanged {
815 task_id: task_id.to_string(),
816 state,
817 });
818 }
819 drop(managed);
820 if let Some(event) = event {
821 self.publish(event);
822 }
823 }
824
825 async fn cancel_managed(&self, session_id: &str, task_id: &str) -> Result<(), PluginError> {
826 let (handle, cancel, event) = {
827 let mut managed = self.managed.lock().await;
828 let Some(record) = managed
829 .get_mut(session_id)
830 .and_then(|tasks| tasks.get_mut(task_id))
831 else {
832 return Ok(());
833 };
834 let taken_handle = record.handle.take();
835 let taken_cancel = record.cancel.take();
836 if !record.status.state.is_terminal() {
837 record.status.state = BackgroundTaskState::Cancelled;
838 record.status.updated_at = SystemTime::now();
839 record.status.completed_at = Some(record.status.updated_at);
840 }
841 (
842 taken_handle,
843 taken_cancel,
844 BackgroundTaskEvent::Cancelled {
845 record: record.status.clone(),
846 },
847 )
848 };
849 if let Some(handle) = handle {
850 handle.abort();
851 }
852 if let Some(cancel) = cancel {
853 cancel().await;
854 }
855 self.publish(event);
856 Ok(())
857 }
858
859 async fn list_managed(&self, session_id: &str) -> Vec<BackgroundTaskRecord> {
860 let managed = self.managed.lock().await;
861 let Some(tasks) = managed.get(session_id) else {
862 return Vec::new();
863 };
864 let mut out: Vec<BackgroundTaskRecord> =
865 tasks.values().map(|record| record.status.clone()).collect();
866 out.sort_by_key(|left| left.created_at);
867 out
868 }
869
870 async fn get_managed(&self, session_id: &str, task_id: &str) -> Option<BackgroundTaskRecord> {
871 let managed = self.managed.lock().await;
872 managed
873 .get(session_id)
874 .and_then(|tasks| tasks.get(task_id))
875 .map(|record| record.status.clone())
876 }
877
878 async fn transfer_managed(
879 &self,
880 from_session_id: &str,
881 to_session_id: &str,
882 task_ids: &[String],
883 ) -> Result<(), PluginError> {
884 if from_session_id == to_session_id || task_ids.is_empty() {
885 return Ok(());
886 }
887 let mut managed = self.managed.lock().await;
888 for task_id in task_ids {
889 if managed
890 .get(to_session_id)
891 .and_then(|tasks| tasks.get(task_id))
892 .is_some_and(|record| !record.status.state.is_terminal())
893 {
894 return Err(PluginError::Session(format!(
895 "background task `{task_id}` already exists in successor session"
896 )));
897 }
898 }
899
900 let mut moved = Vec::new();
901 if let Some(from_tasks) = managed.get_mut(from_session_id) {
902 for task_id in task_ids {
903 if let Some(record) = from_tasks.remove(task_id) {
904 moved.push((task_id.clone(), record));
905 }
906 }
907 if from_tasks.is_empty() {
908 managed.remove(from_session_id);
909 }
910 }
911 if moved.is_empty() {
912 return Ok(());
913 }
914 let to_tasks = managed.entry(to_session_id.to_string()).or_default();
915 for (task_id, record) in moved {
916 to_tasks.insert(task_id.clone(), record);
917 self.publish(BackgroundTaskEvent::Transferred {
918 task_id: task_id.clone(),
919 scope: BackgroundTaskScope {
920 session_id: to_session_id.to_string(),
921 },
922 });
923 }
924 Ok(())
925 }
926
927 async fn cancel_all_managed(
928 &self,
929 session_id: &str,
930 ) -> Result<Vec<BackgroundTaskRecord>, PluginError> {
931 let live_task_ids = {
932 let managed = self.managed.lock().await;
933 managed
934 .get(session_id)
935 .map(|tasks| {
936 tasks
937 .values()
938 .filter(|record| !record.status.state.is_terminal())
939 .map(|record| record.status.id.clone())
940 .collect::<Vec<_>>()
941 })
942 .unwrap_or_default()
943 };
944 let mut out = Vec::new();
945 for task_id in live_task_ids {
946 self.cancel_managed(session_id, &task_id).await?;
947 if let Some(status) = self.get_managed(session_id, &task_id).await {
948 out.push(status);
949 }
950 }
951 Ok(out)
952 }
953}
954
955#[derive(Clone)]
957pub struct RuntimeCoreConfig {
958 pub attachment_store: Arc<dyn crate::AttachmentStore>,
959 pub prompt: crate::PromptLayer,
960 pub trace_sink: Option<Arc<dyn TraceSink>>,
961 pub trace_level: TraceLevel,
962 pub trace_context: TraceContext,
963 pub termination: TerminationPolicy,
964}
965
966impl Default for RuntimeCoreConfig {
967 fn default() -> Self {
968 Self {
969 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
970 prompt: crate::PromptLayer::new(),
971 trace_sink: None,
972 trace_level: TraceLevel::Standard,
973 trace_context: TraceContext::default(),
974 termination: TerminationPolicy::default(),
975 }
976 }
977}
978
979impl RuntimeCoreConfig {
980 pub fn with_attachment_store(
981 mut self,
982 attachment_store: Arc<dyn crate::AttachmentStore>,
983 ) -> Self {
984 self.attachment_store = attachment_store;
985 self
986 }
987
988 pub fn with_prompt_template(mut self, prompt_template: crate::PromptTemplate) -> Self {
989 self.prompt.template = Some(prompt_template);
990 self
991 }
992
993 pub fn with_prompt_contribution(mut self, contribution: crate::PromptContribution) -> Self {
994 self.prompt.add_contribution(contribution);
995 self
996 }
997
998 pub fn with_replaced_prompt_slot(
999 mut self,
1000 slot: crate::PromptSlot,
1001 contributions: impl IntoIterator<Item = crate::PromptContribution>,
1002 ) -> Self {
1003 self.prompt.replace_slot(slot, contributions);
1004 self
1005 }
1006
1007 pub fn with_cleared_prompt_slot(mut self, slot: crate::PromptSlot) -> Self {
1008 self.prompt.clear_slot(slot);
1009 self
1010 }
1011
1012 pub fn with_prompt_layer(mut self, prompt: crate::PromptLayer) -> Self {
1013 self.prompt = prompt;
1014 self
1015 }
1016
1017 pub fn with_trace_jsonl_path(mut self, trace_path: Option<PathBuf>) -> Self {
1018 self.trace_sink =
1019 trace_path.map(|path| Arc::new(JsonlTraceSink::new(path)) as Arc<dyn TraceSink>);
1020 self
1021 }
1022
1023 pub fn with_trace_sink(mut self, sink: Option<Arc<dyn TraceSink>>) -> Self {
1024 self.trace_sink = sink;
1025 self
1026 }
1027
1028 pub fn with_trace_level(mut self, level: TraceLevel) -> Self {
1029 self.trace_level = level;
1030 self
1031 }
1032
1033 pub fn with_trace_context(mut self, context: TraceContext) -> Self {
1034 self.trace_context = context;
1035 self
1036 }
1037
1038 pub fn with_termination(mut self, termination: TerminationPolicy) -> Self {
1039 self.termination = termination;
1040 self
1041 }
1042}
1043
1044#[derive(Clone)]
1046pub struct EmbeddedRuntimeHost {
1047 pub core: RuntimeCoreConfig,
1048 pub session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
1049}
1050
1051impl EmbeddedRuntimeHost {
1052 pub fn new(core: RuntimeCoreConfig) -> Self {
1053 Self {
1054 core,
1055 session_store_factory: None,
1056 }
1057 }
1058
1059 pub fn with_session_store_factory(
1060 mut self,
1061 session_store_factory: Arc<dyn SessionStoreFactory>,
1062 ) -> Self {
1063 self.session_store_factory = Some(session_store_factory);
1064 self
1065 }
1066}
1067
1068#[derive(Clone)]
1070pub struct BackgroundRuntimeHost {
1071 pub embedded: EmbeddedRuntimeHost,
1072 pub background_task_host: Arc<dyn BackgroundTaskHost>,
1073}
1074
1075impl BackgroundRuntimeHost {
1076 pub fn new(
1077 embedded: EmbeddedRuntimeHost,
1078 background_task_host: Arc<dyn BackgroundTaskHost>,
1079 ) -> Self {
1080 Self {
1081 embedded,
1082 background_task_host,
1083 }
1084 }
1085}
1086
1087#[derive(Clone)]
1088pub(crate) struct RuntimeHost {
1089 pub core: RuntimeCoreConfig,
1090 pub session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
1091 pub background_task_host: Option<Arc<dyn BackgroundTaskHost>>,
1092}
1093
1094impl From<EmbeddedRuntimeHost> for RuntimeHost {
1095 fn from(value: EmbeddedRuntimeHost) -> Self {
1096 Self {
1097 core: value.core,
1098 session_store_factory: value.session_store_factory,
1099 background_task_host: None,
1100 }
1101 }
1102}
1103
1104impl From<BackgroundRuntimeHost> for RuntimeHost {
1105 fn from(value: BackgroundRuntimeHost) -> Self {
1106 Self {
1107 core: value.embedded.core,
1108 session_store_factory: value.embedded.session_store_factory,
1109 background_task_host: Some(value.background_task_host),
1110 }
1111 }
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116 use super::*;
1117 use std::sync::atomic::{AtomicUsize, Ordering};
1118
1119 fn spec(id: &str, kind: BackgroundTaskKind) -> BackgroundTaskRegistration {
1120 BackgroundTaskRegistration {
1121 id: id.to_string(),
1122 kind,
1123 producer: "test",
1124 child_session_id: None,
1125 parent_task_id: None,
1126 }
1127 }
1128
1129 #[tokio::test]
1130 async fn background_task_spawn_managed_records_metadata_and_terminates_on_exit() {
1131 let executor = LocalBackgroundTaskHost::default();
1132 executor
1133 .spawn_managed(
1134 "s1",
1135 spec("t1", BackgroundTaskKind::Monitor),
1136 Box::pin(async { Ok(()) }),
1137 )
1138 .await
1139 .expect("spawn");
1140 for _ in 0..50 {
1141 let tasks = executor.list_managed("s1").await;
1142 if tasks
1143 .iter()
1144 .all(|task| !matches!(task.state, BackgroundTaskState::Running))
1145 {
1146 break;
1147 }
1148 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1149 }
1150 let tasks = executor.list_managed("s1").await;
1151 assert_eq!(tasks.len(), 1);
1152 assert_eq!(tasks[0].kind, BackgroundTaskKind::Monitor);
1153 assert_eq!(tasks[0].state, BackgroundTaskState::Completed);
1154 }
1155
1156 #[tokio::test]
1157 async fn background_task_mark_live_state_flips_running_and_waiting_but_preserves_terminal() {
1158 let executor = LocalBackgroundTaskHost::default();
1159 executor
1160 .register_external("s1", spec("sub", BackgroundTaskKind::Subagent), None)
1161 .await
1162 .expect("register");
1163 assert_eq!(
1164 executor.get_managed("s1", "sub").await.unwrap().state,
1165 BackgroundTaskState::Running
1166 );
1167
1168 executor
1169 .mark_live_state("s1", "sub", BackgroundTaskState::Waiting)
1170 .await;
1171 assert_eq!(
1172 executor.get_managed("s1", "sub").await.unwrap().state,
1173 BackgroundTaskState::Waiting
1174 );
1175
1176 executor
1177 .mark_live_state("s1", "sub", BackgroundTaskState::Running)
1178 .await;
1179 assert_eq!(
1180 executor.get_managed("s1", "sub").await.unwrap().state,
1181 BackgroundTaskState::Running
1182 );
1183
1184 executor
1186 .mark_terminal("s1", "sub", BackgroundTaskState::Completed)
1187 .await;
1188 executor
1189 .mark_live_state("s1", "sub", BackgroundTaskState::Running)
1190 .await;
1191 assert_eq!(
1192 executor.get_managed("s1", "sub").await.unwrap().state,
1193 BackgroundTaskState::Completed
1194 );
1195 }
1196
1197 #[tokio::test]
1198 async fn background_task_cancel_managed_fires_external_callback_and_marks_cancelled() {
1199 let executor = LocalBackgroundTaskHost::default();
1200 let calls = Arc::new(AtomicUsize::new(0));
1201 let calls_inner = Arc::clone(&calls);
1202 let cancel: LocalBackgroundTaskCancel = Arc::new(move || {
1203 let calls = Arc::clone(&calls_inner);
1204 Box::pin(async move {
1205 calls.fetch_add(1, Ordering::SeqCst);
1206 })
1207 });
1208 executor
1209 .register_external(
1210 "s1",
1211 spec("sub", BackgroundTaskKind::Subagent),
1212 Some(cancel),
1213 )
1214 .await
1215 .expect("register");
1216 executor.cancel_managed("s1", "sub").await.expect("cancel");
1217 assert_eq!(calls.load(Ordering::SeqCst), 1);
1218 let status = executor.get_managed("s1", "sub").await.expect("status");
1219 assert_eq!(status.state, BackgroundTaskState::Cancelled);
1220 }
1221
1222 #[tokio::test]
1223 async fn background_task_transfer_managed_moves_live_task_visibility() {
1224 let executor = LocalBackgroundTaskHost::default();
1225 executor
1226 .register_external("s1", spec("monitor:one", BackgroundTaskKind::Monitor), None)
1227 .await
1228 .expect("register");
1229
1230 executor
1231 .transfer_managed("s1", "s2", &["monitor:one".to_string()])
1232 .await
1233 .expect("transfer");
1234
1235 assert!(executor.list_managed("s1").await.is_empty());
1236 let tasks = executor.list_managed("s2").await;
1237 assert_eq!(tasks.len(), 1);
1238 assert_eq!(tasks[0].id, "monitor:one");
1239 assert_eq!(tasks[0].state, BackgroundTaskState::Running);
1240 }
1241
1242 #[tokio::test]
1243 async fn background_task_cancel_all_managed_cancels_each_live_task() {
1244 let executor = LocalBackgroundTaskHost::default();
1245 let calls = Arc::new(AtomicUsize::new(0));
1246 for id in ["a", "b"] {
1247 let calls_inner = Arc::clone(&calls);
1248 let cancel: LocalBackgroundTaskCancel = Arc::new(move || {
1249 let calls = Arc::clone(&calls_inner);
1250 Box::pin(async move {
1251 calls.fetch_add(1, Ordering::SeqCst);
1252 })
1253 });
1254 executor
1255 .register_external("s1", spec(id, BackgroundTaskKind::Other), Some(cancel))
1256 .await
1257 .expect("register");
1258 }
1259
1260 let statuses = executor.cancel_all_managed("s1").await.expect("cancel all");
1261
1262 assert_eq!(statuses.len(), 2);
1263 assert_eq!(calls.load(Ordering::SeqCst), 2);
1264 assert!(
1265 statuses
1266 .iter()
1267 .all(|status| status.state == BackgroundTaskState::Cancelled)
1268 );
1269 }
1270
1271 #[tokio::test]
1272 async fn background_task_host_contract_register_update_complete_and_filter() {
1273 let host = LocalBackgroundTaskHost::default();
1274 let registered = host
1275 .register(BackgroundTaskRegisterRequest {
1276 scope: BackgroundTaskScope {
1277 session_id: "s1".to_string(),
1278 },
1279 id: "task:one".to_string(),
1280 kind: BackgroundTaskKind::Other,
1281 producer: "test".to_string(),
1282 parent_task_id: None,
1283 child_session_id: Some("child".to_string()),
1284 cancel_policy: BackgroundCancelPolicy::External,
1285 close_policy: BackgroundClosePolicy::Transfer,
1286 attempt: BackgroundTaskAttempt {
1287 attempt: 1,
1288 max_attempts: Some(3),
1289 idempotency_key: Some("idem".to_string()),
1290 },
1291 })
1292 .await
1293 .expect("register");
1294
1295 assert_eq!(registered.state, BackgroundTaskState::Pending);
1296 assert_eq!(registered.child_session_id.as_deref(), Some("child"));
1297
1298 let updated = host
1299 .update(
1300 "task:one",
1301 BackgroundTaskUpdate {
1302 state: Some(BackgroundTaskState::Running),
1303 progress: Some("started".to_string()),
1304 },
1305 )
1306 .await
1307 .expect("update");
1308 assert_eq!(updated.state, BackgroundTaskState::Running);
1309
1310 assert_eq!(
1311 host.list(BackgroundTaskFilter {
1312 session_id: Some("s1".to_string()),
1313 kind: Some(BackgroundTaskKind::Other),
1314 include_terminal: false,
1315 })
1316 .await
1317 .len(),
1318 1
1319 );
1320
1321 let completed = host
1322 .complete(
1323 "task:one",
1324 BackgroundTaskCompletion {
1325 state: BackgroundTaskState::Completed,
1326 summary: Some("done".to_string()),
1327 },
1328 )
1329 .await
1330 .expect("complete");
1331 assert_eq!(completed.state, BackgroundTaskState::Completed);
1332 assert_eq!(
1333 completed
1334 .result
1335 .as_ref()
1336 .and_then(|outcome| outcome.summary.as_deref()),
1337 Some("done")
1338 );
1339 assert!(completed.completed_at.is_some());
1340 assert!(
1341 host.list(BackgroundTaskFilter {
1342 session_id: Some("s1".to_string()),
1343 kind: None,
1344 include_terminal: false,
1345 })
1346 .await
1347 .is_empty()
1348 );
1349 }
1350}