Skip to main content

lash_core/runtime/
host.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::SystemTime;
5
6use futures_util::stream::{self, BoxStream, StreamExt};
7use lash_trace::{JsonlTraceSink, TraceContext, TraceLevel, TraceSink};
8use serde::{Deserialize, Serialize};
9use tokio::sync::{Mutex, broadcast};
10
11use crate::plugin::PluginError;
12
13use super::{SessionStoreFactory, TerminationPolicy};
14
15/// Category of a registered background task.
16#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum BackgroundTaskKind {
19    Monitor,
20    Subagent,
21    Observer,
22    Other,
23}
24
25impl BackgroundTaskKind {
26    pub fn as_str(&self) -> &'static str {
27        match self {
28            BackgroundTaskKind::Monitor => "monitor",
29            BackgroundTaskKind::Subagent => "subagent",
30            BackgroundTaskKind::Observer => "observer",
31            BackgroundTaskKind::Other => "other",
32        }
33    }
34}
35
36pub type BackgroundTaskId = String;
37
38/// Lifecycle state of a registered background task.
39#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum BackgroundTaskState {
42    Pending,
43    Running,
44    Waiting,
45    Completed,
46    Failed,
47    CancelRequested,
48    Cancelled,
49}
50
51impl BackgroundTaskState {
52    pub fn is_terminal(self) -> bool {
53        matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
54    }
55}
56
57/// Metadata required to register a local background task.
58#[derive(Clone, Debug)]
59pub struct BackgroundTaskRegistration {
60    pub id: BackgroundTaskId,
61    pub kind: BackgroundTaskKind,
62    pub producer: &'static str,
63    pub child_session_id: Option<String>,
64    pub parent_task_id: Option<BackgroundTaskId>,
65}
66
67#[derive(Clone, Debug, Default, Serialize, Deserialize)]
68pub struct BackgroundTaskScope {
69    pub session_id: String,
70}
71
72#[derive(Clone, Debug, Default, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum BackgroundCancelPolicy {
75    #[default]
76    Cooperative,
77    AbortLocal,
78    External,
79}
80
81#[derive(Clone, Debug, Default, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum BackgroundClosePolicy {
84    #[default]
85    Keep,
86    Cancel,
87    Transfer,
88}
89
90#[derive(Clone, Debug, Default, Serialize, Deserialize)]
91pub struct BackgroundTaskAttempt {
92    pub attempt: u32,
93    pub max_attempts: Option<u32>,
94    pub idempotency_key: Option<String>,
95}
96
97#[derive(Clone, Debug, Default, Serialize, Deserialize)]
98pub struct BackgroundTaskOutcome {
99    pub summary: Option<String>,
100}
101
102/// Serializable host-owned background task record.
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct BackgroundTaskRecord {
105    pub id: BackgroundTaskId,
106    pub kind: BackgroundTaskKind,
107    pub producer: String,
108    pub scope: BackgroundTaskScope,
109    pub parent_task_id: Option<BackgroundTaskId>,
110    pub child_session_id: Option<String>,
111    pub state: BackgroundTaskState,
112    pub cancel_policy: BackgroundCancelPolicy,
113    pub close_policy: BackgroundClosePolicy,
114    pub attempt: BackgroundTaskAttempt,
115    pub result: Option<BackgroundTaskOutcome>,
116    pub failure: Option<BackgroundTaskOutcome>,
117    pub created_at: SystemTime,
118    pub updated_at: SystemTime,
119    pub completed_at: Option<SystemTime>,
120}
121
122impl BackgroundTaskRecord {
123    pub fn local_session(
124        session_id: impl Into<String>,
125        id: impl Into<BackgroundTaskId>,
126        kind: BackgroundTaskKind,
127        producer: impl Into<String>,
128        state: BackgroundTaskState,
129    ) -> Self {
130        let now = SystemTime::now();
131        Self {
132            id: id.into(),
133            kind,
134            producer: producer.into(),
135            scope: BackgroundTaskScope {
136                session_id: session_id.into(),
137            },
138            parent_task_id: None,
139            child_session_id: None,
140            state,
141            cancel_policy: BackgroundCancelPolicy::Cooperative,
142            close_policy: BackgroundClosePolicy::Keep,
143            attempt: BackgroundTaskAttempt::default(),
144            result: None,
145            failure: None,
146            created_at: now,
147            updated_at: now,
148            completed_at: state.is_terminal().then_some(now),
149        }
150    }
151}
152
153#[derive(Clone, Debug, Serialize, Deserialize)]
154#[serde(tag = "type", rename_all = "snake_case")]
155pub enum BackgroundTaskEvent {
156    Registered {
157        record: BackgroundTaskRecord,
158    },
159    StateChanged {
160        task_id: BackgroundTaskId,
161        state: BackgroundTaskState,
162    },
163    Progress {
164        task_id: BackgroundTaskId,
165        message: String,
166    },
167    Completed {
168        record: BackgroundTaskRecord,
169    },
170    Failed {
171        record: BackgroundTaskRecord,
172    },
173    CancelRequested {
174        task_id: BackgroundTaskId,
175        reason: Option<String>,
176    },
177    Cancelled {
178        record: BackgroundTaskRecord,
179    },
180    Transferred {
181        task_id: BackgroundTaskId,
182        scope: BackgroundTaskScope,
183    },
184}
185
186#[derive(Clone, Debug, Default, Serialize, Deserialize)]
187pub struct BackgroundTaskFilter {
188    pub session_id: Option<String>,
189    pub kind: Option<BackgroundTaskKind>,
190    pub include_terminal: bool,
191}
192
193#[derive(Clone, Debug, Serialize, Deserialize)]
194pub struct BackgroundTaskRegisterRequest {
195    pub scope: BackgroundTaskScope,
196    pub id: BackgroundTaskId,
197    pub kind: BackgroundTaskKind,
198    pub producer: String,
199    pub parent_task_id: Option<BackgroundTaskId>,
200    pub child_session_id: Option<String>,
201    pub cancel_policy: BackgroundCancelPolicy,
202    pub close_policy: BackgroundClosePolicy,
203    pub attempt: BackgroundTaskAttempt,
204}
205
206#[derive(Clone, Debug, Default, Serialize, Deserialize)]
207pub struct BackgroundTaskUpdate {
208    pub state: Option<BackgroundTaskState>,
209    pub progress: Option<String>,
210}
211
212#[derive(Clone, Debug, Serialize, Deserialize)]
213pub struct BackgroundTaskCompletion {
214    pub state: BackgroundTaskState,
215    pub summary: Option<String>,
216}
217
218/// Local-only cancellation hook for in-process tasks such as subagent trees.
219pub type LocalBackgroundTaskCancel =
220    Arc<dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
221
222/// Host-owned background task lifecycle policy.
223#[async_trait::async_trait]
224pub trait BackgroundTaskHost: Send + Sync {
225    async fn register(
226        &self,
227        request: BackgroundTaskRegisterRequest,
228    ) -> Result<BackgroundTaskRecord, PluginError>;
229
230    async fn update(
231        &self,
232        task_id: &str,
233        update: BackgroundTaskUpdate,
234    ) -> Result<BackgroundTaskRecord, PluginError>;
235
236    async fn complete(
237        &self,
238        task_id: &str,
239        outcome: BackgroundTaskCompletion,
240    ) -> Result<BackgroundTaskRecord, PluginError>;
241
242    async fn request_cancel(
243        &self,
244        task_id: &str,
245        reason: Option<String>,
246    ) -> Result<BackgroundTaskRecord, PluginError>;
247
248    async fn get(&self, task_id: &str) -> Option<BackgroundTaskRecord>;
249
250    async fn list(&self, filter: BackgroundTaskFilter) -> Vec<BackgroundTaskRecord>;
251
252    async fn transfer(
253        &self,
254        task_id: &str,
255        new_scope: BackgroundTaskScope,
256    ) -> Result<BackgroundTaskRecord, PluginError>;
257
258    fn subscribe(&self, filter: BackgroundTaskFilter) -> BoxStream<'static, BackgroundTaskEvent>;
259
260    async fn spawn_hidden(
261        &self,
262        session_id: &str,
263        label: &str,
264        task: crate::plugin::PluginSessionTask,
265    ) -> Result<(), PluginError>;
266
267    async fn await_hidden(&self, session_id: &str) -> Result<(), PluginError>;
268
269    /// Spawn a tokio-backed background task and register it under `spec.id`.
270    /// Local host owns the Tokio future and can abort it via `cancel_managed`.
271    async fn spawn_managed(
272        &self,
273        session_id: &str,
274        spec: BackgroundTaskRegistration,
275        task: crate::plugin::PluginSessionTask,
276    ) -> Result<(), PluginError>;
277
278    /// Register an externally-owned task (e.g. a subagent session) so it
279    /// shows up in `list_managed`. The local host does not hold a `JoinHandle`;
280    /// if `cancel` is supplied it will be invoked when the registry receives
281    /// a cancel request, otherwise cancellation only updates bookkeeping.
282    async fn register_external(
283        &self,
284        session_id: &str,
285        spec: BackgroundTaskRegistration,
286        cancel: Option<LocalBackgroundTaskCancel>,
287    ) -> Result<(), PluginError>;
288
289    async fn unregister_external(&self, session_id: &str, task_id: &str);
290
291    /// Update the state of a registered task to a terminal value.
292    /// No-op if the task is unknown or already terminal.
293    async fn mark_terminal(&self, session_id: &str, task_id: &str, state: BackgroundTaskState);
294
295    /// Transition a still-live task between the non-terminal `Running`
296    /// and `Waiting` states. Used by subagents to reflect whether the
297    /// session is actively working on a task or waiting for a
298    /// follow-up. No-op if the task is unknown or already terminal.
299    async fn mark_live_state(&self, session_id: &str, task_id: &str, state: BackgroundTaskState);
300
301    /// Cancel a tokio-backed task by id. No-op for externally-owned entries;
302    /// callers should also mark those terminal via `mark_terminal`.
303    async fn cancel_managed(&self, session_id: &str, task_id: &str) -> Result<(), PluginError>;
304
305    /// Read-only snapshot of every registered task for the session.
306    async fn list_managed(&self, session_id: &str) -> Vec<BackgroundTaskRecord>;
307
308    /// Look up a single task's metadata.
309    async fn get_managed(&self, session_id: &str, task_id: &str) -> Option<BackgroundTaskRecord>;
310
311    /// Move still-registered tasks from one session scope to another.
312    async fn transfer_managed(
313        &self,
314        from_session_id: &str,
315        to_session_id: &str,
316        task_ids: &[String],
317    ) -> Result<(), PluginError>;
318
319    /// Cancel every live task in a session scope and return updated snapshots.
320    async fn cancel_all_managed(
321        &self,
322        session_id: &str,
323    ) -> Result<Vec<BackgroundTaskRecord>, PluginError>;
324}
325
326/// Tokio-backed background task host shared across runtime sessions.
327pub struct LocalBackgroundTaskHost {
328    hidden: Mutex<HiddenTaskMap>,
329    managed: Arc<Mutex<ManagedTaskMap>>,
330    events: broadcast::Sender<BackgroundTaskEvent>,
331}
332
333impl Default for LocalBackgroundTaskHost {
334    fn default() -> Self {
335        let (events, _) = broadcast::channel(256);
336        Self {
337            hidden: Mutex::new(HashMap::new()),
338            managed: Arc::new(Mutex::new(HashMap::new())),
339            events,
340        }
341    }
342}
343
344type SessionTaskHandle = tokio::task::JoinHandle<Result<(), PluginError>>;
345type HiddenTaskMap = HashMap<String, Vec<SessionTaskHandle>>;
346type ManagedTaskMap = HashMap<String, HashMap<String, ManagedTaskRecord>>;
347
348struct ManagedTaskRecord {
349    status: BackgroundTaskRecord,
350    handle: Option<SessionTaskHandle>,
351    cancel: Option<LocalBackgroundTaskCancel>,
352}
353
354fn new_background_task_record(
355    scope_session_id: &str,
356    spec: &BackgroundTaskRegistration,
357    state: BackgroundTaskState,
358) -> BackgroundTaskRecord {
359    let mut record = BackgroundTaskRecord::local_session(
360        scope_session_id,
361        spec.id.clone(),
362        spec.kind,
363        spec.producer,
364        state,
365    );
366    record.parent_task_id = spec.parent_task_id.clone();
367    record.child_session_id = spec.child_session_id.clone();
368    record
369}
370
371fn event_matches_filter(event: &BackgroundTaskEvent, filter: &BackgroundTaskFilter) -> bool {
372    let record = match event {
373        BackgroundTaskEvent::Registered { record }
374        | BackgroundTaskEvent::Completed { record }
375        | BackgroundTaskEvent::Failed { record }
376        | BackgroundTaskEvent::Cancelled { record } => Some(record),
377        _ => None,
378    };
379    if let Some(record) = record {
380        if filter
381            .session_id
382            .as_ref()
383            .is_some_and(|session_id| &record.scope.session_id != session_id)
384        {
385            return false;
386        }
387        if filter.kind.is_some_and(|kind| record.kind != kind) {
388            return false;
389        }
390        return filter.include_terminal || !record.state.is_terminal();
391    }
392    true
393}
394
395fn record_matches_filter(record: &BackgroundTaskRecord, filter: &BackgroundTaskFilter) -> bool {
396    if filter
397        .session_id
398        .as_ref()
399        .is_some_and(|session_id| &record.scope.session_id != session_id)
400    {
401        return false;
402    }
403    if filter.kind.is_some_and(|kind| record.kind != kind) {
404        return false;
405    }
406    filter.include_terminal || !record.state.is_terminal()
407}
408
409impl LocalBackgroundTaskHost {
410    fn publish(&self, event: BackgroundTaskEvent) {
411        let _ = self.events.send(event);
412    }
413}
414
415#[async_trait::async_trait]
416impl BackgroundTaskHost for LocalBackgroundTaskHost {
417    async fn register(
418        &self,
419        request: BackgroundTaskRegisterRequest,
420    ) -> Result<BackgroundTaskRecord, PluginError> {
421        let mut managed = self.managed.lock().await;
422        let tasks = managed.entry(request.scope.session_id.clone()).or_default();
423        if tasks
424            .get(&request.id)
425            .is_some_and(|record| !record.status.state.is_terminal())
426        {
427            return Err(PluginError::Session(format!(
428                "background task `{}` is already registered",
429                request.id
430            )));
431        }
432        let now = SystemTime::now();
433        let record = BackgroundTaskRecord {
434            id: request.id.clone(),
435            kind: request.kind,
436            producer: request.producer,
437            scope: request.scope,
438            parent_task_id: request.parent_task_id,
439            child_session_id: request.child_session_id,
440            state: BackgroundTaskState::Pending,
441            cancel_policy: request.cancel_policy,
442            close_policy: request.close_policy,
443            attempt: request.attempt,
444            result: None,
445            failure: None,
446            created_at: now,
447            updated_at: now,
448            completed_at: None,
449        };
450        tasks.insert(
451            request.id,
452            ManagedTaskRecord {
453                status: record.clone(),
454                handle: None,
455                cancel: None,
456            },
457        );
458        drop(managed);
459        self.publish(BackgroundTaskEvent::Registered {
460            record: record.clone(),
461        });
462        Ok(record)
463    }
464
465    async fn update(
466        &self,
467        task_id: &str,
468        update: BackgroundTaskUpdate,
469    ) -> Result<BackgroundTaskRecord, PluginError> {
470        let mut managed = self.managed.lock().await;
471        for tasks in managed.values_mut() {
472            if let Some(record) = tasks.get_mut(task_id) {
473                if let Some(state) = update.state {
474                    record.status.state = state;
475                    record.status.updated_at = SystemTime::now();
476                }
477                let status = record.status.clone();
478                drop(managed);
479                if let Some(message) = update.progress {
480                    self.publish(BackgroundTaskEvent::Progress {
481                        task_id: task_id.to_string(),
482                        message,
483                    });
484                }
485                self.publish(BackgroundTaskEvent::StateChanged {
486                    task_id: task_id.to_string(),
487                    state: status.state,
488                });
489                return Ok(status);
490            }
491        }
492        Err(PluginError::Session(format!(
493            "unknown background task `{task_id}`"
494        )))
495    }
496
497    async fn complete(
498        &self,
499        task_id: &str,
500        outcome: BackgroundTaskCompletion,
501    ) -> Result<BackgroundTaskRecord, PluginError> {
502        if !outcome.state.is_terminal() {
503            return Err(PluginError::Session(
504                "background task completion must use a terminal state".to_string(),
505            ));
506        }
507        let mut managed = self.managed.lock().await;
508        for tasks in managed.values_mut() {
509            if let Some(record) = tasks.get_mut(task_id) {
510                record.status.state = outcome.state;
511                record.status.updated_at = SystemTime::now();
512                record.status.completed_at = Some(record.status.updated_at);
513                let summary = BackgroundTaskOutcome {
514                    summary: outcome.summary,
515                };
516                if outcome.state == BackgroundTaskState::Failed {
517                    record.status.failure = Some(summary);
518                } else {
519                    record.status.result = Some(summary);
520                }
521                record.handle = None;
522                let status = record.status.clone();
523                drop(managed);
524                self.publish(match status.state {
525                    BackgroundTaskState::Failed => BackgroundTaskEvent::Failed {
526                        record: status.clone(),
527                    },
528                    BackgroundTaskState::Cancelled => BackgroundTaskEvent::Cancelled {
529                        record: status.clone(),
530                    },
531                    _ => BackgroundTaskEvent::Completed {
532                        record: status.clone(),
533                    },
534                });
535                return Ok(status);
536            }
537        }
538        Err(PluginError::Session(format!(
539            "unknown background task `{task_id}`"
540        )))
541    }
542
543    async fn request_cancel(
544        &self,
545        task_id: &str,
546        reason: Option<String>,
547    ) -> Result<BackgroundTaskRecord, PluginError> {
548        let status = self
549            .update(
550                task_id,
551                BackgroundTaskUpdate {
552                    state: Some(BackgroundTaskState::CancelRequested),
553                    progress: None,
554                },
555            )
556            .await?;
557        self.publish(BackgroundTaskEvent::CancelRequested {
558            task_id: task_id.to_string(),
559            reason,
560        });
561        Ok(status)
562    }
563
564    async fn get(&self, task_id: &str) -> Option<BackgroundTaskRecord> {
565        let managed = self.managed.lock().await;
566        managed
567            .values()
568            .find_map(|tasks| tasks.get(task_id).map(|record| record.status.clone()))
569    }
570
571    async fn list(&self, filter: BackgroundTaskFilter) -> Vec<BackgroundTaskRecord> {
572        let managed = self.managed.lock().await;
573        let mut out = managed
574            .values()
575            .flat_map(|tasks| tasks.values())
576            .map(|record| record.status.clone())
577            .filter(|record| record_matches_filter(record, &filter))
578            .collect::<Vec<_>>();
579        out.sort_by_key(|record| record.created_at);
580        out
581    }
582
583    async fn transfer(
584        &self,
585        task_id: &str,
586        new_scope: BackgroundTaskScope,
587    ) -> Result<BackgroundTaskRecord, PluginError> {
588        let mut managed = self.managed.lock().await;
589        let mut moved = None;
590        for tasks in managed.values_mut() {
591            if let Some(record) = tasks.remove(task_id) {
592                moved = Some(record);
593                break;
594            }
595        }
596        let Some(mut record) = moved else {
597            return Err(PluginError::Session(format!(
598                "unknown background task `{task_id}`"
599            )));
600        };
601        record.status.scope = new_scope.clone();
602        record.status.updated_at = SystemTime::now();
603        let status = record.status.clone();
604        managed
605            .entry(new_scope.session_id.clone())
606            .or_default()
607            .insert(task_id.to_string(), record);
608        drop(managed);
609        self.publish(BackgroundTaskEvent::Transferred {
610            task_id: task_id.to_string(),
611            scope: new_scope,
612        });
613        Ok(status)
614    }
615
616    fn subscribe(&self, filter: BackgroundTaskFilter) -> BoxStream<'static, BackgroundTaskEvent> {
617        let rx = self.events.subscribe();
618        stream::unfold((rx, filter), |(mut rx, filter)| async move {
619            loop {
620                match rx.recv().await {
621                    Ok(event) if event_matches_filter(&event, &filter) => {
622                        return Some((event, (rx, filter)));
623                    }
624                    Ok(_) => continue,
625                    Err(broadcast::error::RecvError::Lagged(_)) => continue,
626                    Err(broadcast::error::RecvError::Closed) => return None,
627                }
628            }
629        })
630        .boxed()
631    }
632
633    async fn spawn_hidden(
634        &self,
635        session_id: &str,
636        _label: &str,
637        task: crate::plugin::PluginSessionTask,
638    ) -> Result<(), PluginError> {
639        let handle = tokio::spawn(task);
640        self.hidden
641            .lock()
642            .await
643            .entry(session_id.to_string())
644            .or_default()
645            .push(handle);
646        Ok(())
647    }
648
649    async fn await_hidden(&self, session_id: &str) -> Result<(), PluginError> {
650        loop {
651            let tasks = self
652                .hidden
653                .lock()
654                .await
655                .remove(session_id)
656                .unwrap_or_default();
657            if tasks.is_empty() {
658                return Ok(());
659            }
660            for task in tasks {
661                match task.await {
662                    Ok(Ok(())) => {}
663                    Ok(Err(err)) => return Err(err),
664                    Err(err) => {
665                        return Err(PluginError::Session(format!(
666                            "hidden background task failed: {err}"
667                        )));
668                    }
669                }
670            }
671        }
672    }
673
674    async fn spawn_managed(
675        &self,
676        session_id: &str,
677        spec: BackgroundTaskRegistration,
678        task: crate::plugin::PluginSessionTask,
679    ) -> Result<(), PluginError> {
680        let mut managed = self.managed.lock().await;
681        let tasks = managed.entry(session_id.to_string()).or_default();
682        if tasks
683            .get(&spec.id)
684            .is_some_and(|record| !record.status.state.is_terminal())
685        {
686            return Err(PluginError::Session(format!(
687                "managed session task `{}` is already running",
688                spec.id
689            )));
690        }
691        let records = Arc::clone(&self.managed);
692        let session_key = session_id.to_string();
693        let task_id = spec.id.clone();
694        let handle = tokio::spawn(async move {
695            let result = task.await;
696            let terminal = match &result {
697                Ok(()) => BackgroundTaskState::Completed,
698                Err(_) => BackgroundTaskState::Failed,
699            };
700            if let Some(record) = records
701                .lock()
702                .await
703                .get_mut(&session_key)
704                .and_then(|tasks| tasks.get_mut(&task_id))
705                && !record.status.state.is_terminal()
706            {
707                record.status.state = terminal;
708                record.status.updated_at = SystemTime::now();
709                record.status.completed_at = Some(record.status.updated_at);
710                record.handle = None;
711            }
712            result
713        });
714        let record = ManagedTaskRecord {
715            status: new_background_task_record(session_id, &spec, BackgroundTaskState::Running),
716            handle: Some(handle),
717            cancel: None,
718        };
719        let status = record.status.clone();
720        tasks.insert(spec.id, record);
721        drop(managed);
722        self.publish(BackgroundTaskEvent::Registered { record: status });
723        Ok(())
724    }
725
726    async fn register_external(
727        &self,
728        session_id: &str,
729        spec: BackgroundTaskRegistration,
730        cancel: Option<LocalBackgroundTaskCancel>,
731    ) -> Result<(), PluginError> {
732        let mut managed = self.managed.lock().await;
733        let tasks = managed.entry(session_id.to_string()).or_default();
734        if tasks
735            .get(&spec.id)
736            .is_some_and(|record| !record.status.state.is_terminal())
737        {
738            return Err(PluginError::Session(format!(
739                "background task `{}` is already registered",
740                spec.id
741            )));
742        }
743        let record = ManagedTaskRecord {
744            status: new_background_task_record(session_id, &spec, BackgroundTaskState::Running),
745            handle: None,
746            cancel,
747        };
748        let status = record.status.clone();
749        tasks.insert(spec.id, record);
750        drop(managed);
751        self.publish(BackgroundTaskEvent::Registered { record: status });
752        Ok(())
753    }
754
755    async fn unregister_external(&self, session_id: &str, task_id: &str) {
756        let mut managed = self.managed.lock().await;
757        if let Some(tasks) = managed.get_mut(session_id) {
758            tasks.remove(task_id);
759            if tasks.is_empty() {
760                managed.remove(session_id);
761            }
762        }
763    }
764
765    async fn mark_terminal(&self, session_id: &str, task_id: &str, state: BackgroundTaskState) {
766        if !state.is_terminal() {
767            return;
768        }
769        let mut event = None;
770        let mut managed = self.managed.lock().await;
771        if let Some(record) = managed
772            .get_mut(session_id)
773            .and_then(|tasks| tasks.get_mut(task_id))
774            && !record.status.state.is_terminal()
775        {
776            record.status.state = state;
777            record.status.updated_at = SystemTime::now();
778            record.status.completed_at = Some(record.status.updated_at);
779            record.handle = None;
780            event = Some(match state {
781                BackgroundTaskState::Failed => BackgroundTaskEvent::Failed {
782                    record: record.status.clone(),
783                },
784                BackgroundTaskState::Cancelled => BackgroundTaskEvent::Cancelled {
785                    record: record.status.clone(),
786                },
787                _ => BackgroundTaskEvent::Completed {
788                    record: record.status.clone(),
789                },
790            });
791        }
792        drop(managed);
793        if let Some(event) = event {
794            self.publish(event);
795        }
796    }
797
798    async fn mark_live_state(&self, session_id: &str, task_id: &str, state: BackgroundTaskState) {
799        if !matches!(
800            state,
801            BackgroundTaskState::Running | BackgroundTaskState::Waiting
802        ) {
803            return;
804        }
805        let mut event = None;
806        let mut managed = self.managed.lock().await;
807        if let Some(record) = managed
808            .get_mut(session_id)
809            .and_then(|tasks| tasks.get_mut(task_id))
810            && !record.status.state.is_terminal()
811        {
812            record.status.state = state;
813            record.status.updated_at = SystemTime::now();
814            event = Some(BackgroundTaskEvent::StateChanged {
815                task_id: task_id.to_string(),
816                state,
817            });
818        }
819        drop(managed);
820        if let Some(event) = event {
821            self.publish(event);
822        }
823    }
824
825    async fn cancel_managed(&self, session_id: &str, task_id: &str) -> Result<(), PluginError> {
826        let (handle, cancel, event) = {
827            let mut managed = self.managed.lock().await;
828            let Some(record) = managed
829                .get_mut(session_id)
830                .and_then(|tasks| tasks.get_mut(task_id))
831            else {
832                return Ok(());
833            };
834            let taken_handle = record.handle.take();
835            let taken_cancel = record.cancel.take();
836            if !record.status.state.is_terminal() {
837                record.status.state = BackgroundTaskState::Cancelled;
838                record.status.updated_at = SystemTime::now();
839                record.status.completed_at = Some(record.status.updated_at);
840            }
841            (
842                taken_handle,
843                taken_cancel,
844                BackgroundTaskEvent::Cancelled {
845                    record: record.status.clone(),
846                },
847            )
848        };
849        if let Some(handle) = handle {
850            handle.abort();
851        }
852        if let Some(cancel) = cancel {
853            cancel().await;
854        }
855        self.publish(event);
856        Ok(())
857    }
858
859    async fn list_managed(&self, session_id: &str) -> Vec<BackgroundTaskRecord> {
860        let managed = self.managed.lock().await;
861        let Some(tasks) = managed.get(session_id) else {
862            return Vec::new();
863        };
864        let mut out: Vec<BackgroundTaskRecord> =
865            tasks.values().map(|record| record.status.clone()).collect();
866        out.sort_by_key(|left| left.created_at);
867        out
868    }
869
870    async fn get_managed(&self, session_id: &str, task_id: &str) -> Option<BackgroundTaskRecord> {
871        let managed = self.managed.lock().await;
872        managed
873            .get(session_id)
874            .and_then(|tasks| tasks.get(task_id))
875            .map(|record| record.status.clone())
876    }
877
878    async fn transfer_managed(
879        &self,
880        from_session_id: &str,
881        to_session_id: &str,
882        task_ids: &[String],
883    ) -> Result<(), PluginError> {
884        if from_session_id == to_session_id || task_ids.is_empty() {
885            return Ok(());
886        }
887        let mut managed = self.managed.lock().await;
888        for task_id in task_ids {
889            if managed
890                .get(to_session_id)
891                .and_then(|tasks| tasks.get(task_id))
892                .is_some_and(|record| !record.status.state.is_terminal())
893            {
894                return Err(PluginError::Session(format!(
895                    "background task `{task_id}` already exists in successor session"
896                )));
897            }
898        }
899
900        let mut moved = Vec::new();
901        if let Some(from_tasks) = managed.get_mut(from_session_id) {
902            for task_id in task_ids {
903                if let Some(record) = from_tasks.remove(task_id) {
904                    moved.push((task_id.clone(), record));
905                }
906            }
907            if from_tasks.is_empty() {
908                managed.remove(from_session_id);
909            }
910        }
911        if moved.is_empty() {
912            return Ok(());
913        }
914        let to_tasks = managed.entry(to_session_id.to_string()).or_default();
915        for (task_id, record) in moved {
916            to_tasks.insert(task_id.clone(), record);
917            self.publish(BackgroundTaskEvent::Transferred {
918                task_id: task_id.clone(),
919                scope: BackgroundTaskScope {
920                    session_id: to_session_id.to_string(),
921                },
922            });
923        }
924        Ok(())
925    }
926
927    async fn cancel_all_managed(
928        &self,
929        session_id: &str,
930    ) -> Result<Vec<BackgroundTaskRecord>, PluginError> {
931        let live_task_ids = {
932            let managed = self.managed.lock().await;
933            managed
934                .get(session_id)
935                .map(|tasks| {
936                    tasks
937                        .values()
938                        .filter(|record| !record.status.state.is_terminal())
939                        .map(|record| record.status.id.clone())
940                        .collect::<Vec<_>>()
941                })
942                .unwrap_or_default()
943        };
944        let mut out = Vec::new();
945        for task_id in live_task_ids {
946            self.cancel_managed(session_id, &task_id).await?;
947            if let Some(status) = self.get_managed(session_id, &task_id).await {
948                out.push(status);
949            }
950        }
951        Ok(out)
952    }
953}
954
955/// Required host configuration for all runtimes.
956#[derive(Clone)]
957pub struct RuntimeCoreConfig {
958    pub attachment_store: Arc<dyn crate::AttachmentStore>,
959    pub prompt: crate::PromptLayer,
960    pub trace_sink: Option<Arc<dyn TraceSink>>,
961    pub trace_level: TraceLevel,
962    pub trace_context: TraceContext,
963    pub termination: TerminationPolicy,
964}
965
966impl Default for RuntimeCoreConfig {
967    fn default() -> Self {
968        Self {
969            attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
970            prompt: crate::PromptLayer::new(),
971            trace_sink: None,
972            trace_level: TraceLevel::Standard,
973            trace_context: TraceContext::default(),
974            termination: TerminationPolicy::default(),
975        }
976    }
977}
978
979impl RuntimeCoreConfig {
980    pub fn with_attachment_store(
981        mut self,
982        attachment_store: Arc<dyn crate::AttachmentStore>,
983    ) -> Self {
984        self.attachment_store = attachment_store;
985        self
986    }
987
988    pub fn with_prompt_template(mut self, prompt_template: crate::PromptTemplate) -> Self {
989        self.prompt.template = Some(prompt_template);
990        self
991    }
992
993    pub fn with_prompt_contribution(mut self, contribution: crate::PromptContribution) -> Self {
994        self.prompt.add_contribution(contribution);
995        self
996    }
997
998    pub fn with_replaced_prompt_slot(
999        mut self,
1000        slot: crate::PromptSlot,
1001        contributions: impl IntoIterator<Item = crate::PromptContribution>,
1002    ) -> Self {
1003        self.prompt.replace_slot(slot, contributions);
1004        self
1005    }
1006
1007    pub fn with_cleared_prompt_slot(mut self, slot: crate::PromptSlot) -> Self {
1008        self.prompt.clear_slot(slot);
1009        self
1010    }
1011
1012    pub fn with_prompt_layer(mut self, prompt: crate::PromptLayer) -> Self {
1013        self.prompt = prompt;
1014        self
1015    }
1016
1017    pub fn with_trace_jsonl_path(mut self, trace_path: Option<PathBuf>) -> Self {
1018        self.trace_sink =
1019            trace_path.map(|path| Arc::new(JsonlTraceSink::new(path)) as Arc<dyn TraceSink>);
1020        self
1021    }
1022
1023    pub fn with_trace_sink(mut self, sink: Option<Arc<dyn TraceSink>>) -> Self {
1024        self.trace_sink = sink;
1025        self
1026    }
1027
1028    pub fn with_trace_level(mut self, level: TraceLevel) -> Self {
1029        self.trace_level = level;
1030        self
1031    }
1032
1033    pub fn with_trace_context(mut self, context: TraceContext) -> Self {
1034        self.trace_context = context;
1035        self
1036    }
1037
1038    pub fn with_termination(mut self, termination: TerminationPolicy) -> Self {
1039        self.termination = termination;
1040        self
1041    }
1042}
1043
1044/// Base host shape for embedded runtimes.
1045#[derive(Clone)]
1046pub struct EmbeddedRuntimeHost {
1047    pub core: RuntimeCoreConfig,
1048    pub session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
1049}
1050
1051impl EmbeddedRuntimeHost {
1052    pub fn new(core: RuntimeCoreConfig) -> Self {
1053        Self {
1054            core,
1055            session_store_factory: None,
1056        }
1057    }
1058
1059    pub fn with_session_store_factory(
1060        mut self,
1061        session_store_factory: Arc<dyn SessionStoreFactory>,
1062    ) -> Self {
1063        self.session_store_factory = Some(session_store_factory);
1064        self
1065    }
1066}
1067
1068/// Host shape for runtimes that support background plugin work.
1069#[derive(Clone)]
1070pub struct BackgroundRuntimeHost {
1071    pub embedded: EmbeddedRuntimeHost,
1072    pub background_task_host: Arc<dyn BackgroundTaskHost>,
1073}
1074
1075impl BackgroundRuntimeHost {
1076    pub fn new(
1077        embedded: EmbeddedRuntimeHost,
1078        background_task_host: Arc<dyn BackgroundTaskHost>,
1079    ) -> Self {
1080        Self {
1081            embedded,
1082            background_task_host,
1083        }
1084    }
1085}
1086
1087#[derive(Clone)]
1088pub(crate) struct RuntimeHost {
1089    pub core: RuntimeCoreConfig,
1090    pub session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
1091    pub background_task_host: Option<Arc<dyn BackgroundTaskHost>>,
1092}
1093
1094impl From<EmbeddedRuntimeHost> for RuntimeHost {
1095    fn from(value: EmbeddedRuntimeHost) -> Self {
1096        Self {
1097            core: value.core,
1098            session_store_factory: value.session_store_factory,
1099            background_task_host: None,
1100        }
1101    }
1102}
1103
1104impl From<BackgroundRuntimeHost> for RuntimeHost {
1105    fn from(value: BackgroundRuntimeHost) -> Self {
1106        Self {
1107            core: value.embedded.core,
1108            session_store_factory: value.embedded.session_store_factory,
1109            background_task_host: Some(value.background_task_host),
1110        }
1111    }
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116    use super::*;
1117    use std::sync::atomic::{AtomicUsize, Ordering};
1118
1119    fn spec(id: &str, kind: BackgroundTaskKind) -> BackgroundTaskRegistration {
1120        BackgroundTaskRegistration {
1121            id: id.to_string(),
1122            kind,
1123            producer: "test",
1124            child_session_id: None,
1125            parent_task_id: None,
1126        }
1127    }
1128
1129    #[tokio::test]
1130    async fn background_task_spawn_managed_records_metadata_and_terminates_on_exit() {
1131        let executor = LocalBackgroundTaskHost::default();
1132        executor
1133            .spawn_managed(
1134                "s1",
1135                spec("t1", BackgroundTaskKind::Monitor),
1136                Box::pin(async { Ok(()) }),
1137            )
1138            .await
1139            .expect("spawn");
1140        for _ in 0..50 {
1141            let tasks = executor.list_managed("s1").await;
1142            if tasks
1143                .iter()
1144                .all(|task| !matches!(task.state, BackgroundTaskState::Running))
1145            {
1146                break;
1147            }
1148            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1149        }
1150        let tasks = executor.list_managed("s1").await;
1151        assert_eq!(tasks.len(), 1);
1152        assert_eq!(tasks[0].kind, BackgroundTaskKind::Monitor);
1153        assert_eq!(tasks[0].state, BackgroundTaskState::Completed);
1154    }
1155
1156    #[tokio::test]
1157    async fn background_task_mark_live_state_flips_running_and_waiting_but_preserves_terminal() {
1158        let executor = LocalBackgroundTaskHost::default();
1159        executor
1160            .register_external("s1", spec("sub", BackgroundTaskKind::Subagent), None)
1161            .await
1162            .expect("register");
1163        assert_eq!(
1164            executor.get_managed("s1", "sub").await.unwrap().state,
1165            BackgroundTaskState::Running
1166        );
1167
1168        executor
1169            .mark_live_state("s1", "sub", BackgroundTaskState::Waiting)
1170            .await;
1171        assert_eq!(
1172            executor.get_managed("s1", "sub").await.unwrap().state,
1173            BackgroundTaskState::Waiting
1174        );
1175
1176        executor
1177            .mark_live_state("s1", "sub", BackgroundTaskState::Running)
1178            .await;
1179        assert_eq!(
1180            executor.get_managed("s1", "sub").await.unwrap().state,
1181            BackgroundTaskState::Running
1182        );
1183
1184        // Terminal transitions win over live-state flips.
1185        executor
1186            .mark_terminal("s1", "sub", BackgroundTaskState::Completed)
1187            .await;
1188        executor
1189            .mark_live_state("s1", "sub", BackgroundTaskState::Running)
1190            .await;
1191        assert_eq!(
1192            executor.get_managed("s1", "sub").await.unwrap().state,
1193            BackgroundTaskState::Completed
1194        );
1195    }
1196
1197    #[tokio::test]
1198    async fn background_task_cancel_managed_fires_external_callback_and_marks_cancelled() {
1199        let executor = LocalBackgroundTaskHost::default();
1200        let calls = Arc::new(AtomicUsize::new(0));
1201        let calls_inner = Arc::clone(&calls);
1202        let cancel: LocalBackgroundTaskCancel = Arc::new(move || {
1203            let calls = Arc::clone(&calls_inner);
1204            Box::pin(async move {
1205                calls.fetch_add(1, Ordering::SeqCst);
1206            })
1207        });
1208        executor
1209            .register_external(
1210                "s1",
1211                spec("sub", BackgroundTaskKind::Subagent),
1212                Some(cancel),
1213            )
1214            .await
1215            .expect("register");
1216        executor.cancel_managed("s1", "sub").await.expect("cancel");
1217        assert_eq!(calls.load(Ordering::SeqCst), 1);
1218        let status = executor.get_managed("s1", "sub").await.expect("status");
1219        assert_eq!(status.state, BackgroundTaskState::Cancelled);
1220    }
1221
1222    #[tokio::test]
1223    async fn background_task_transfer_managed_moves_live_task_visibility() {
1224        let executor = LocalBackgroundTaskHost::default();
1225        executor
1226            .register_external("s1", spec("monitor:one", BackgroundTaskKind::Monitor), None)
1227            .await
1228            .expect("register");
1229
1230        executor
1231            .transfer_managed("s1", "s2", &["monitor:one".to_string()])
1232            .await
1233            .expect("transfer");
1234
1235        assert!(executor.list_managed("s1").await.is_empty());
1236        let tasks = executor.list_managed("s2").await;
1237        assert_eq!(tasks.len(), 1);
1238        assert_eq!(tasks[0].id, "monitor:one");
1239        assert_eq!(tasks[0].state, BackgroundTaskState::Running);
1240    }
1241
1242    #[tokio::test]
1243    async fn background_task_cancel_all_managed_cancels_each_live_task() {
1244        let executor = LocalBackgroundTaskHost::default();
1245        let calls = Arc::new(AtomicUsize::new(0));
1246        for id in ["a", "b"] {
1247            let calls_inner = Arc::clone(&calls);
1248            let cancel: LocalBackgroundTaskCancel = Arc::new(move || {
1249                let calls = Arc::clone(&calls_inner);
1250                Box::pin(async move {
1251                    calls.fetch_add(1, Ordering::SeqCst);
1252                })
1253            });
1254            executor
1255                .register_external("s1", spec(id, BackgroundTaskKind::Other), Some(cancel))
1256                .await
1257                .expect("register");
1258        }
1259
1260        let statuses = executor.cancel_all_managed("s1").await.expect("cancel all");
1261
1262        assert_eq!(statuses.len(), 2);
1263        assert_eq!(calls.load(Ordering::SeqCst), 2);
1264        assert!(
1265            statuses
1266                .iter()
1267                .all(|status| status.state == BackgroundTaskState::Cancelled)
1268        );
1269    }
1270
1271    #[tokio::test]
1272    async fn background_task_host_contract_register_update_complete_and_filter() {
1273        let host = LocalBackgroundTaskHost::default();
1274        let registered = host
1275            .register(BackgroundTaskRegisterRequest {
1276                scope: BackgroundTaskScope {
1277                    session_id: "s1".to_string(),
1278                },
1279                id: "task:one".to_string(),
1280                kind: BackgroundTaskKind::Other,
1281                producer: "test".to_string(),
1282                parent_task_id: None,
1283                child_session_id: Some("child".to_string()),
1284                cancel_policy: BackgroundCancelPolicy::External,
1285                close_policy: BackgroundClosePolicy::Transfer,
1286                attempt: BackgroundTaskAttempt {
1287                    attempt: 1,
1288                    max_attempts: Some(3),
1289                    idempotency_key: Some("idem".to_string()),
1290                },
1291            })
1292            .await
1293            .expect("register");
1294
1295        assert_eq!(registered.state, BackgroundTaskState::Pending);
1296        assert_eq!(registered.child_session_id.as_deref(), Some("child"));
1297
1298        let updated = host
1299            .update(
1300                "task:one",
1301                BackgroundTaskUpdate {
1302                    state: Some(BackgroundTaskState::Running),
1303                    progress: Some("started".to_string()),
1304                },
1305            )
1306            .await
1307            .expect("update");
1308        assert_eq!(updated.state, BackgroundTaskState::Running);
1309
1310        assert_eq!(
1311            host.list(BackgroundTaskFilter {
1312                session_id: Some("s1".to_string()),
1313                kind: Some(BackgroundTaskKind::Other),
1314                include_terminal: false,
1315            })
1316            .await
1317            .len(),
1318            1
1319        );
1320
1321        let completed = host
1322            .complete(
1323                "task:one",
1324                BackgroundTaskCompletion {
1325                    state: BackgroundTaskState::Completed,
1326                    summary: Some("done".to_string()),
1327                },
1328            )
1329            .await
1330            .expect("complete");
1331        assert_eq!(completed.state, BackgroundTaskState::Completed);
1332        assert_eq!(
1333            completed
1334                .result
1335                .as_ref()
1336                .and_then(|outcome| outcome.summary.as_deref()),
1337            Some("done")
1338        );
1339        assert!(completed.completed_at.is_some());
1340        assert!(
1341            host.list(BackgroundTaskFilter {
1342                session_id: Some("s1".to_string()),
1343                kind: None,
1344                include_terminal: false,
1345            })
1346            .await
1347            .is_empty()
1348        );
1349    }
1350}