Skip to main content

awaken_runtime/extensions/background/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use awaken_contract::StateError;
6use parking_lot::RwLock;
7use tokio::sync::Mutex;
8use tokio::task::JoinHandle;
9
10use crate::cancellation::{CancellationHandle, CancellationToken};
11use crate::inbox::InboxSender;
12use crate::state::{MutationBatch, StateStore};
13
14use super::state::{
15    BackgroundTaskStateAction, BackgroundTaskStateKey, BackgroundTaskStateSnapshot,
16    PersistedTaskMeta,
17};
18use super::types::{
19    AgentTaskContext, TaskContext, TaskEvent, TaskId, TaskParentContext, TaskResult, TaskStatus,
20    TaskSummary,
21};
22use super::{
23    BackgroundTaskExecutionContext, current_background_task_context, current_tool_lineage_context,
24    scope_background_task_context,
25};
26
27/// Errors from [`BackgroundTaskManager::send_task_inbox_message`].
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SendError {
30    /// No task with this ID exists.
31    TaskNotFound,
32    /// Caller's thread does not own the task.
33    NotOwner,
34    /// Task has already reached a terminal state.
35    TaskTerminated(TaskStatus),
36    /// Task is not a sub-agent (has no inbox).
37    NoInbox,
38    /// Inbox receiver was dropped (sub-agent ended).
39    InboxClosed,
40}
41
42impl std::fmt::Display for SendError {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            Self::TaskNotFound => write!(f, "task not found"),
46            Self::NotOwner => write!(f, "caller does not own this task"),
47            Self::TaskTerminated(s) => write!(f, "task already {}", s.as_str()),
48            Self::NoInbox => write!(f, "task has no inbox (not a sub-agent)"),
49            Self::InboxClosed => write!(f, "sub-agent inbox closed"),
50        }
51    }
52}
53
54impl std::error::Error for SendError {}
55
56/// Reserved names that cannot be used as task names.
57const RESERVED_NAMES: &[&str] = &["parent", "self", "all", "broadcast"];
58
59/// Errors from task spawn operations.
60#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
61#[non_exhaustive]
62pub enum SpawnError {
63    /// The name is reserved by the system.
64    #[error("'{0}' is a reserved name")]
65    ReservedName(String),
66    /// Another running task on this thread already has this name.
67    #[error("a running task named '{0}' already exists")]
68    DuplicateName(String),
69    /// No state store has been configured for the manager.
70    #[error("background task state store is not configured")]
71    StoreNotConfigured,
72    /// Commit to the state store failed.
73    #[error(transparent)]
74    State(#[from] StateError),
75}
76
77#[derive(Debug, thiserror::Error)]
78enum MetaCommitError {
79    #[error("background task state store is not configured")]
80    StoreUnavailable,
81    #[error(transparent)]
82    State(#[from] StateError),
83}
84
85impl From<MetaCommitError> for SpawnError {
86    fn from(err: MetaCommitError) -> Self {
87        match err {
88            MetaCommitError::StoreUnavailable => Self::StoreNotConfigured,
89            MetaCommitError::State(e) => Self::State(e),
90        }
91    }
92}
93
94/// Runtime-only handle for a live background task.
95///
96/// Contains only non-serializable runtime handles (cancel, join, inbox).
97/// All metadata (status, error, result, timestamps) lives in the StateStore
98/// under [`BackgroundTaskStateKey`].
99struct TaskHandle {
100    task_id: TaskId,
101    owner_thread_id: String,
102    cancel_handle: CancellationHandle,
103    _join_handle: JoinHandle<()>,
104    /// Inbox sender for sub-agent tasks (allows parent to send messages).
105    agent_inbox: Option<InboxSender>,
106}
107
108/// Thread-scoped handle table for background tasks.
109///
110/// Spawns, tracks, cancels, and queries background tasks.
111/// Task metadata (status, error, result, timestamps) is stored in the
112/// [`StateStore`] as the single source of truth. This struct only holds
113/// runtime handles (cancel, join, inbox).
114pub struct BackgroundTaskManager {
115    handles: Mutex<HashMap<TaskId, TaskHandle>>,
116    counter: AtomicU64,
117    owner_inbox: RwLock<Option<InboxSender>>,
118    store: std::sync::OnceLock<StateStore>,
119}
120
121impl BackgroundTaskManager {
122    pub fn new() -> Self {
123        Self {
124            handles: Mutex::new(HashMap::new()),
125            counter: AtomicU64::new(0),
126            owner_inbox: RwLock::new(None),
127            store: std::sync::OnceLock::new(),
128        }
129    }
130
131    /// Set the inbox sender that background tasks receive for pushing
132    /// messages to the owner thread.
133    pub fn set_owner_inbox(&self, inbox: InboxSender) {
134        *self.owner_inbox.write() = Some(inbox);
135    }
136
137    /// Provide the state store for metadata persistence.
138    ///
139    /// Called once during plugin registration or run start. Subsequent
140    /// calls are silently ignored (OnceLock semantics).
141    pub fn set_store(&self, store: StateStore) {
142        let _ = self.store.set(store);
143    }
144
145    /// Validate a task name: check reserved names and uniqueness.
146    fn validate_name(&self, name: &str, owner_thread_id: &str) -> Result<(), SpawnError> {
147        if RESERVED_NAMES.contains(&name) {
148            return Err(SpawnError::ReservedName(name.to_string()));
149        }
150        // Check uniqueness among running tasks on this thread
151        if let Some(store) = self.store()
152            && let Some(snap) = store.read::<BackgroundTaskStateKey>()
153        {
154            for meta in snap.tasks.values() {
155                if meta.owner_thread_id == owner_thread_id
156                    && !meta.status.is_terminal()
157                    && meta.name.as_deref() == Some(name)
158                {
159                    return Err(SpawnError::DuplicateName(name.to_string()));
160                }
161            }
162        }
163        Ok(())
164    }
165
166    /// Returns a reference to the store, if set.
167    fn store(&self) -> Option<&StateStore> {
168        self.store.get()
169    }
170
171    fn owner_inbox(&self) -> Option<InboxSender> {
172        self.owner_inbox.read().clone()
173    }
174
175    #[cfg(test)]
176    pub(crate) fn panic_while_holding_owner_inbox_lock_for_test(&self) {
177        let _guard = self.owner_inbox.write();
178        panic!("owner_inbox lock test panic");
179    }
180
181    #[cfg(test)]
182    pub(crate) fn has_owner_inbox_for_test(&self) -> bool {
183        self.owner_inbox().is_some()
184    }
185
186    fn next_task_id(&self) -> TaskId {
187        let n = self.counter.fetch_add(1, Ordering::Relaxed);
188        format!("bg_{n}")
189    }
190
191    fn merge_ambient_parent_context(
192        &self,
193        mut parent_context: TaskParentContext,
194    ) -> TaskParentContext {
195        if parent_context.task_id.is_none()
196            && let Some(context) = current_background_task_context()
197        {
198            parent_context.task_id = Some(context.task_id);
199        }
200
201        if let Some(context) = current_tool_lineage_context() {
202            if parent_context.run_id.is_none() {
203                parent_context.run_id = Some(context.run_id);
204            }
205            if parent_context.call_id.is_none() && !context.call_id.is_empty() {
206                parent_context.call_id = Some(context.call_id);
207            }
208            if parent_context.agent_id.is_none() && !context.agent_id.is_empty() {
209                parent_context.agent_id = Some(context.agent_id);
210            }
211        }
212
213        parent_context
214    }
215
216    /// Commit a state update to the store.
217    fn commit_meta(&self, action: BackgroundTaskStateAction) -> Result<u64, MetaCommitError> {
218        let Some(store) = self.store() else {
219            return Err(MetaCommitError::StoreUnavailable);
220        };
221
222        let mut batch = MutationBatch::new();
223        batch.update::<BackgroundTaskStateKey>(action);
224        store.commit(batch).map_err(Into::into)
225    }
226
227    fn commit_meta_or_warn(
228        &self,
229        action: BackgroundTaskStateAction,
230        operation: &'static str,
231        task_id: &str,
232    ) {
233        if let Err(error) = self.commit_meta(action) {
234            metrics::counter!(
235                "awaken_background_task_state_commit_failures_total",
236                "operation" => operation
237            )
238            .increment(1);
239            tracing::warn!(
240                operation,
241                task_id,
242                error = %error,
243                "background task metadata commit failed"
244            );
245        }
246    }
247
248    fn terminal_event(task_id: &str, result: &TaskResult) -> TaskEvent {
249        match result {
250            TaskResult::Success(val) => TaskEvent::Completed {
251                task_id: task_id.to_string(),
252                result: Some(val.clone()),
253            },
254            TaskResult::Failed(err) => TaskEvent::Failed {
255                task_id: task_id.to_string(),
256                error: err.clone(),
257            },
258            TaskResult::Cancelled => TaskEvent::Cancelled {
259                task_id: task_id.to_string(),
260            },
261        }
262    }
263
264    /// Spawn a background task.
265    ///
266    /// The `task_fn` receives a [`TaskContext`] and returns a `TaskResult`.
267    /// Spawn a background task.
268    ///
269    /// `name` is an optional short identifier for addressing (e.g. "researcher").
270    /// If provided, it must be unique among running tasks on this thread and
271    /// must not be a reserved name ("parent", "self", "all", "broadcast").
272    pub async fn spawn<F, Fut>(
273        self: &Arc<Self>,
274        owner_thread_id: &str,
275        task_type: &str,
276        name: Option<&str>,
277        description: &str,
278        parent_context: TaskParentContext,
279        task_fn: F,
280    ) -> Result<TaskId, SpawnError>
281    where
282        F: FnOnce(TaskContext) -> Fut + Send + 'static,
283        Fut: std::future::Future<Output = TaskResult> + Send + 'static,
284    {
285        let parent_context = self.merge_ambient_parent_context(parent_context);
286        if let Some(n) = name {
287            self.validate_name(n, owner_thread_id)?;
288        }
289        let task_id = self.next_task_id();
290        let (cancel_handle, cancel_token) = CancellationToken::new_pair();
291        let now = now_ms();
292
293        let ctx = TaskContext {
294            task_id: task_id.clone(),
295            cancel_token,
296            inbox: self.owner_inbox(),
297        };
298
299        let task_name = name.map(|n| n.to_string());
300
301        // Commit initial metadata to the store
302        self.commit_meta(BackgroundTaskStateAction::Upsert(Box::new(
303            PersistedTaskMeta {
304                task_id: task_id.clone(),
305                owner_thread_id: owner_thread_id.to_string(),
306                task_type: task_type.to_string(),
307                name: task_name.clone(),
308                description: description.to_string(),
309                status: TaskStatus::Running,
310                error: None,
311                result: None,
312                created_at_ms: now,
313                completed_at_ms: None,
314                parent_context: parent_context.clone(),
315            },
316        )))
317        .map_err(SpawnError::from)?;
318
319        let manager = Arc::clone(self);
320        let tid = task_id.clone();
321        let owner_inbox = self.owner_inbox();
322        let owner = owner_thread_id.to_string();
323        let ttype = task_type.to_string();
324        let tname = task_name.clone();
325        let desc = description.to_string();
326
327        let join_handle = tokio::spawn(async move {
328            let result = scope_background_task_context(
329                BackgroundTaskExecutionContext {
330                    manager: manager.clone(),
331                    task_id: tid.clone(),
332                },
333                task_fn(ctx),
334            )
335            .await;
336            let completed_at = now_ms();
337
338            // Update metadata in the store
339            let (status, error, result_val) = match &result {
340                TaskResult::Success(val) => (TaskStatus::Completed, None, Some(val.clone())),
341                TaskResult::Failed(err) => (TaskStatus::Failed, Some(err.clone()), None),
342                TaskResult::Cancelled => (TaskStatus::Cancelled, None, None),
343            };
344
345            manager.commit_meta_or_warn(
346                BackgroundTaskStateAction::Upsert(Box::new(PersistedTaskMeta {
347                    task_id: tid.clone(),
348                    owner_thread_id: owner,
349                    task_type: ttype,
350                    name: tname,
351                    description: desc,
352                    status,
353                    error,
354                    result: result_val,
355                    created_at_ms: now,
356                    completed_at_ms: Some(completed_at),
357                    parent_context,
358                })),
359                "task_completion",
360                &tid,
361            );
362
363            // Notify owner via inbox (terminal event).
364            if let Some(ref inbox) = owner_inbox {
365                let event = Self::terminal_event(&tid, &result);
366                inbox.send(
367                    serde_json::to_value(&event).expect("TaskEvent serialization is infallible"),
368                );
369            }
370        });
371
372        let handle = TaskHandle {
373            task_id: task_id.clone(),
374            owner_thread_id: owner_thread_id.to_string(),
375            cancel_handle,
376            _join_handle: join_handle,
377            agent_inbox: None,
378        };
379
380        self.handles.lock().await.insert(task_id.clone(), handle);
381        Ok(task_id)
382    }
383
384    /// Cancel a running task.
385    pub async fn cancel(&self, task_id: &str) -> bool {
386        let handles = self.handles.lock().await;
387        if let Some(handle) = handles.get(task_id) {
388            // Check status from the store
389            if let Some(store) = self.store()
390                && let Some(snap) = store.read::<BackgroundTaskStateKey>()
391                && let Some(meta) = snap.tasks.get(task_id)
392                && meta.status.is_terminal()
393            {
394                return false;
395            }
396            handle.cancel_handle.cancel();
397            return true;
398        }
399        false
400    }
401
402    /// Cancel a task and every known descendant task in the same manager.
403    ///
404    /// Descendants are discovered through `TaskParentContext.task_id`.
405    /// Returns the number of live tasks whose cancellation token was signalled.
406    pub async fn cancel_tree(&self, task_id: &str) -> usize {
407        let Some(task_ids) = self.task_tree_ids(task_id) else {
408            return 0;
409        };
410
411        let handles = self.handles.lock().await;
412        let store_snap = self
413            .store()
414            .and_then(|s| s.read::<BackgroundTaskStateKey>());
415        let mut count = 0;
416        for task_id in task_ids {
417            let Some(handle) = handles.get(&task_id) else {
418                continue;
419            };
420            let is_terminal = store_snap
421                .as_ref()
422                .and_then(|snap| snap.tasks.get(&task_id))
423                .map(|meta| meta.status.is_terminal())
424                .unwrap_or(false);
425            if !is_terminal {
426                handle.cancel_handle.cancel();
427                count += 1;
428            }
429        }
430        count
431    }
432
433    /// Cancel all running tasks for a given thread.
434    /// Returns the number of tasks cancelled.
435    pub async fn cancel_all(&self, owner_thread_id: &str) -> usize {
436        let handles = self.handles.lock().await;
437        let store_snap = self
438            .store()
439            .and_then(|s| s.read::<BackgroundTaskStateKey>());
440        let mut count = 0;
441        for handle in handles.values() {
442            if handle.owner_thread_id != owner_thread_id {
443                continue;
444            }
445            let is_terminal = store_snap
446                .as_ref()
447                .and_then(|snap| snap.tasks.get(&handle.task_id))
448                .map(|m| m.status.is_terminal())
449                .unwrap_or(false);
450            if !is_terminal {
451                handle.cancel_handle.cancel();
452                count += 1;
453            }
454        }
455        count
456    }
457
458    /// List all tasks for a given owner thread.
459    pub async fn list(&self, owner_thread_id: &str) -> Vec<TaskSummary> {
460        if let Some(store) = self.store()
461            && let Some(snap) = store.read::<BackgroundTaskStateKey>()
462        {
463            return snap
464                .tasks
465                .values()
466                .filter(|m| m.owner_thread_id == owner_thread_id)
467                .map(Self::meta_to_summary)
468                .collect();
469        }
470        Vec::new()
471    }
472
473    /// Get the summary of a specific task.
474    pub async fn get(&self, task_id: &str) -> Option<TaskSummary> {
475        self.store()
476            .and_then(|s| s.read::<BackgroundTaskStateKey>())
477            .and_then(|snap| snap.tasks.get(task_id).map(Self::meta_to_summary))
478    }
479
480    fn meta_to_summary(m: &PersistedTaskMeta) -> TaskSummary {
481        TaskSummary {
482            task_id: m.task_id.clone(),
483            task_type: m.task_type.clone(),
484            description: m.description.clone(),
485            status: m.status,
486            error: m.error.clone(),
487            result: m.result.clone(),
488            created_at_ms: m.created_at_ms,
489            completed_at_ms: m.completed_at_ms,
490            parent_context: m.parent_context.clone(),
491        }
492    }
493
494    /// Restore persisted task metadata from a snapshot into the store.
495    pub(crate) async fn restore_for_thread(
496        &self,
497        owner_thread_id: &str,
498        snapshot: &BackgroundTaskStateSnapshot,
499    ) {
500        // First, write the snapshot data into the store
501        if let Some(store) = self.store() {
502            // Merge with existing data: only add tasks not already present
503            let existing = store.read::<BackgroundTaskStateKey>().unwrap_or_default();
504
505            for (task_id, meta) in &snapshot.tasks {
506                if existing.tasks.contains_key(task_id) {
507                    continue;
508                }
509
510                // Update counter
511                if let Some(n) = task_id
512                    .strip_prefix("bg_")
513                    .and_then(|s| s.parse::<u64>().ok())
514                {
515                    self.counter
516                        .fetch_max(n.saturating_add(1), Ordering::Relaxed);
517                }
518
519                let handles = self.handles.lock().await;
520                let has_live_handle = handles.contains_key(task_id);
521                drop(handles);
522
523                let mut to_store = meta.clone();
524                to_store.owner_thread_id = owner_thread_id.to_string();
525
526                // Orphan detection
527                if meta.status == TaskStatus::Running && !has_live_handle {
528                    to_store.status = TaskStatus::Failed;
529                    to_store.error =
530                        Some("task orphaned: runtime restarted while running".to_string());
531                }
532
533                self.commit_meta_or_warn(
534                    BackgroundTaskStateAction::Upsert(Box::new(to_store)),
535                    "restore_task_metadata",
536                    task_id,
537                );
538            }
539        }
540    }
541
542    /// Returns true if any task for the given thread is still running.
543    pub async fn has_running(&self, owner_thread_id: &str) -> bool {
544        if let Some(store) = self.store()
545            && let Some(snap) = store.read::<BackgroundTaskStateKey>()
546        {
547            return snap
548                .tasks
549                .values()
550                .any(|m| m.owner_thread_id == owner_thread_id && !m.status.is_terminal());
551        }
552        // Fallback: check handles if store not available
553        self.handles
554            .lock()
555            .await
556            .values()
557            .any(|h| h.owner_thread_id == owner_thread_id)
558    }
559
560    /// Spawn a sub-agent as a background task with its own inbox.
561    ///
562    /// `name` is an optional short identifier for addressing via `send_message`.
563    pub async fn spawn_agent<F, Fut>(
564        self: &Arc<Self>,
565        owner_thread_id: &str,
566        name: Option<&str>,
567        description: &str,
568        parent_context: TaskParentContext,
569        task_fn: F,
570    ) -> Result<TaskId, SpawnError>
571    where
572        F: FnOnce(CancellationToken, InboxSender, crate::inbox::InboxReceiver) -> Fut
573            + Send
574            + 'static,
575        Fut: std::future::Future<Output = TaskResult> + Send + 'static,
576    {
577        self.spawn_agent_with_context(owner_thread_id, name, description, parent_context, |ctx| {
578            task_fn(ctx.cancel_token, ctx.inbox_sender, ctx.inbox_receiver)
579        })
580        .await
581    }
582
583    /// Spawn a sub-agent as a background task while exposing the spawned task
584    /// ID to the closure for lineage-aware coordination.
585    pub async fn spawn_agent_with_context<F, Fut>(
586        self: &Arc<Self>,
587        owner_thread_id: &str,
588        name: Option<&str>,
589        description: &str,
590        parent_context: TaskParentContext,
591        task_fn: F,
592    ) -> Result<TaskId, SpawnError>
593    where
594        F: FnOnce(AgentTaskContext) -> Fut + Send + 'static,
595        Fut: std::future::Future<Output = TaskResult> + Send + 'static,
596    {
597        let parent_context = self.merge_ambient_parent_context(parent_context);
598        if let Some(n) = name {
599            self.validate_name(n, owner_thread_id)?;
600        }
601        let task_id = self.next_task_id();
602        let (cancel_handle, cancel_token) = CancellationToken::new_pair();
603        let now = now_ms();
604
605        let (child_inbox_tx, child_inbox_rx) = crate::inbox::inbox_channel();
606        let stored_sender = child_inbox_tx.clone();
607
608        let task_name = name.map(|n| n.to_string());
609
610        // Commit initial metadata
611        self.commit_meta(BackgroundTaskStateAction::Upsert(Box::new(
612            PersistedTaskMeta {
613                task_id: task_id.clone(),
614                owner_thread_id: owner_thread_id.to_string(),
615                task_type: "sub_agent".to_string(),
616                name: task_name.clone(),
617                description: description.to_string(),
618                status: TaskStatus::Running,
619                error: None,
620                result: None,
621                created_at_ms: now,
622                completed_at_ms: None,
623                parent_context: parent_context.clone(),
624            },
625        )))
626        .map_err(SpawnError::from)?;
627
628        let manager = Arc::clone(self);
629        let tid = task_id.clone();
630        let owner_inbox = self.owner_inbox();
631        let owner = owner_thread_id.to_string();
632        let tname = task_name.clone();
633        let desc = description.to_string();
634
635        let join_handle = tokio::spawn(async move {
636            let result = scope_background_task_context(
637                BackgroundTaskExecutionContext {
638                    manager: manager.clone(),
639                    task_id: tid.clone(),
640                },
641                task_fn(AgentTaskContext {
642                    task_id: tid.clone(),
643                    cancel_token,
644                    inbox_sender: child_inbox_tx,
645                    inbox_receiver: child_inbox_rx,
646                }),
647            )
648            .await;
649            let completed_at = now_ms();
650
651            let (status, error, result_val) = match &result {
652                TaskResult::Success(val) => (TaskStatus::Completed, None, Some(val.clone())),
653                TaskResult::Failed(err) => (TaskStatus::Failed, Some(err.clone()), None),
654                TaskResult::Cancelled => (TaskStatus::Cancelled, None, None),
655            };
656
657            manager.commit_meta_or_warn(
658                BackgroundTaskStateAction::Upsert(Box::new(PersistedTaskMeta {
659                    task_id: tid.clone(),
660                    owner_thread_id: owner,
661                    task_type: "sub_agent".to_string(),
662                    name: tname,
663                    description: desc,
664                    status,
665                    error,
666                    result: result_val,
667                    created_at_ms: now,
668                    completed_at_ms: Some(completed_at),
669                    parent_context,
670                })),
671                "sub_agent_completion",
672                &tid,
673            );
674
675            let event = Self::terminal_event(&tid, &result);
676            if let Some(ref inbox) = owner_inbox {
677                inbox.send(
678                    serde_json::to_value(&event).expect("TaskEvent serialization is infallible"),
679                );
680            }
681        });
682
683        let handle = TaskHandle {
684            task_id: task_id.clone(),
685            owner_thread_id: owner_thread_id.to_string(),
686            cancel_handle,
687            _join_handle: join_handle,
688            agent_inbox: Some(stored_sender),
689        };
690
691        self.handles.lock().await.insert(task_id.clone(), handle);
692        Ok(task_id)
693    }
694
695    /// Send a message to a child task's live inbox.
696    ///
697    /// This is the internal low-latency transport for parent→child
698    /// communication within the same process. For cross-agent or durable
699    /// messaging, use the mailbox-based `send_message` tool instead.
700    pub async fn send_task_inbox_message(
701        &self,
702        task_id: &str,
703        owner_thread_id: &str,
704        sender_agent_id: &str,
705        content: &str,
706    ) -> Result<(), SendError> {
707        let handles = self.handles.lock().await;
708        let handle = handles.get(task_id).ok_or(SendError::TaskNotFound)?;
709
710        // Authorization: sender must be on the same thread that owns the task
711        if handle.owner_thread_id != owner_thread_id {
712            return Err(SendError::NotOwner);
713        }
714
715        // Check status from the store
716        if let Some(store) = self.store()
717            && let Some(snap) = store.read::<BackgroundTaskStateKey>()
718            && let Some(meta) = snap.tasks.get(task_id)
719            && meta.status.is_terminal()
720        {
721            return Err(SendError::TaskTerminated(meta.status));
722        }
723
724        let inbox = handle.agent_inbox.as_ref().ok_or(SendError::NoInbox)?;
725
726        let event = TaskEvent::Custom {
727            task_id: task_id.to_string(),
728            event_type: "agent_message".to_string(),
729            payload: serde_json::json!({
730                "from": sender_agent_id,
731                "content": content,
732            }),
733        };
734
735        if inbox.send(serde_json::to_value(&event).expect("TaskEvent serialization is infallible"))
736        {
737            Ok(())
738        } else {
739            Err(SendError::InboxClosed)
740        }
741    }
742
743    pub(crate) fn task_tree_ids(&self, task_id: &str) -> Option<Vec<TaskId>> {
744        let snapshot = self
745            .store()
746            .and_then(|store| store.read::<BackgroundTaskStateKey>())?;
747        if !snapshot.tasks.contains_key(task_id) {
748            return None;
749        }
750
751        let mut ordered = Vec::new();
752        let mut stack = vec![task_id.to_string()];
753        while let Some(current) = stack.pop() {
754            if ordered.iter().any(|seen| seen == &current) {
755                continue;
756            }
757            ordered.push(current.clone());
758            for meta in snapshot.tasks.values() {
759                if meta.parent_context.task_id.as_deref() == Some(current.as_str()) {
760                    stack.push(meta.task_id.clone());
761                }
762            }
763        }
764        Some(ordered)
765    }
766
767    pub(crate) fn resolve_live_child_task(
768        &self,
769        parent_task_id: &str,
770        name_or_task_id: &str,
771    ) -> Option<TaskId> {
772        let snapshot = self.store()?.read::<BackgroundTaskStateKey>()?;
773        for meta in snapshot.tasks.values() {
774            if meta.status.is_terminal() {
775                continue;
776            }
777            if meta.parent_context.task_id.as_deref() != Some(parent_task_id) {
778                continue;
779            }
780            if meta.task_id == name_or_task_id || meta.name.as_deref() == Some(name_or_task_id) {
781                return Some(meta.task_id.clone());
782            }
783        }
784        None
785    }
786
787    pub(crate) fn resolve_live_child_run(
788        &self,
789        parent_run_id: &str,
790        name_or_task_id: &str,
791    ) -> Option<TaskId> {
792        let snapshot = self.store()?.read::<BackgroundTaskStateKey>()?;
793        for meta in snapshot.tasks.values() {
794            if meta.status.is_terminal() {
795                continue;
796            }
797            if meta.parent_context.run_id.as_deref() != Some(parent_run_id)
798                || meta.parent_context.task_id.is_some()
799            {
800                continue;
801            }
802            if meta.task_id == name_or_task_id || meta.name.as_deref() == Some(name_or_task_id) {
803                return Some(meta.task_id.clone());
804            }
805        }
806        None
807    }
808
809    pub async fn cancel_descendants_for_run(&self, parent_run_id: &str) -> usize {
810        let root_task_ids = self
811            .store()
812            .and_then(|store| store.read::<BackgroundTaskStateKey>())
813            .map(|snapshot| {
814                snapshot
815                    .tasks
816                    .values()
817                    .filter(|meta| {
818                        !meta.status.is_terminal()
819                            && meta.parent_context.run_id.as_deref() == Some(parent_run_id)
820                            && meta.parent_context.task_id.is_none()
821                    })
822                    .map(|meta| meta.task_id.clone())
823                    .collect::<Vec<_>>()
824            })
825            .unwrap_or_default();
826
827        let mut cancelled = 0usize;
828        for task_id in root_task_ids {
829            cancelled += self.cancel_tree(&task_id).await;
830        }
831        cancelled
832    }
833
834    #[cfg(test)]
835    pub(crate) async fn persisted_snapshot(&self) -> HashMap<TaskId, PersistedTaskMeta> {
836        if let Some(store) = self.store()
837            && let Some(snap) = store.read::<BackgroundTaskStateKey>()
838        {
839            return snap.tasks;
840        }
841        HashMap::new()
842    }
843}
844
845impl Default for BackgroundTaskManager {
846    fn default() -> Self {
847        Self::new()
848    }
849}
850
851use awaken_contract::now_ms;