1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use awaken_contract::StateError;
6use parking_lot::RwLock;
7use tokio::sync::Mutex;
8use tokio::task::JoinHandle;
9
10use crate::cancellation::{CancellationHandle, CancellationToken};
11use crate::inbox::InboxSender;
12use crate::state::{MutationBatch, StateStore};
13
14use super::state::{
15 BackgroundTaskStateAction, BackgroundTaskStateKey, BackgroundTaskStateSnapshot,
16 PersistedTaskMeta,
17};
18use super::types::{
19 AgentTaskContext, TaskContext, TaskEvent, TaskId, TaskParentContext, TaskResult, TaskStatus,
20 TaskSummary,
21};
22use super::{
23 BackgroundTaskExecutionContext, current_background_task_context, current_tool_lineage_context,
24 scope_background_task_context,
25};
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum SendError {
30 TaskNotFound,
32 NotOwner,
34 TaskTerminated(TaskStatus),
36 NoInbox,
38 InboxClosed,
40}
41
42impl std::fmt::Display for SendError {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 Self::TaskNotFound => write!(f, "task not found"),
46 Self::NotOwner => write!(f, "caller does not own this task"),
47 Self::TaskTerminated(s) => write!(f, "task already {}", s.as_str()),
48 Self::NoInbox => write!(f, "task has no inbox (not a sub-agent)"),
49 Self::InboxClosed => write!(f, "sub-agent inbox closed"),
50 }
51 }
52}
53
54impl std::error::Error for SendError {}
55
56const RESERVED_NAMES: &[&str] = &["parent", "self", "all", "broadcast"];
58
59#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
61#[non_exhaustive]
62pub enum SpawnError {
63 #[error("'{0}' is a reserved name")]
65 ReservedName(String),
66 #[error("a running task named '{0}' already exists")]
68 DuplicateName(String),
69 #[error("background task state store is not configured")]
71 StoreNotConfigured,
72 #[error(transparent)]
74 State(#[from] StateError),
75}
76
77#[derive(Debug, thiserror::Error)]
78enum MetaCommitError {
79 #[error("background task state store is not configured")]
80 StoreUnavailable,
81 #[error(transparent)]
82 State(#[from] StateError),
83}
84
85impl From<MetaCommitError> for SpawnError {
86 fn from(err: MetaCommitError) -> Self {
87 match err {
88 MetaCommitError::StoreUnavailable => Self::StoreNotConfigured,
89 MetaCommitError::State(e) => Self::State(e),
90 }
91 }
92}
93
94struct TaskHandle {
100 task_id: TaskId,
101 owner_thread_id: String,
102 cancel_handle: CancellationHandle,
103 _join_handle: JoinHandle<()>,
104 agent_inbox: Option<InboxSender>,
106}
107
108pub struct BackgroundTaskManager {
115 handles: Mutex<HashMap<TaskId, TaskHandle>>,
116 counter: AtomicU64,
117 owner_inbox: RwLock<Option<InboxSender>>,
118 store: std::sync::OnceLock<StateStore>,
119}
120
121impl BackgroundTaskManager {
122 pub fn new() -> Self {
123 Self {
124 handles: Mutex::new(HashMap::new()),
125 counter: AtomicU64::new(0),
126 owner_inbox: RwLock::new(None),
127 store: std::sync::OnceLock::new(),
128 }
129 }
130
131 pub fn set_owner_inbox(&self, inbox: InboxSender) {
134 *self.owner_inbox.write() = Some(inbox);
135 }
136
137 pub fn set_store(&self, store: StateStore) {
142 let _ = self.store.set(store);
143 }
144
145 fn validate_name(&self, name: &str, owner_thread_id: &str) -> Result<(), SpawnError> {
147 if RESERVED_NAMES.contains(&name) {
148 return Err(SpawnError::ReservedName(name.to_string()));
149 }
150 if let Some(store) = self.store()
152 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
153 {
154 for meta in snap.tasks.values() {
155 if meta.owner_thread_id == owner_thread_id
156 && !meta.status.is_terminal()
157 && meta.name.as_deref() == Some(name)
158 {
159 return Err(SpawnError::DuplicateName(name.to_string()));
160 }
161 }
162 }
163 Ok(())
164 }
165
166 fn store(&self) -> Option<&StateStore> {
168 self.store.get()
169 }
170
171 fn owner_inbox(&self) -> Option<InboxSender> {
172 self.owner_inbox.read().clone()
173 }
174
175 #[cfg(test)]
176 pub(crate) fn panic_while_holding_owner_inbox_lock_for_test(&self) {
177 let _guard = self.owner_inbox.write();
178 panic!("owner_inbox lock test panic");
179 }
180
181 #[cfg(test)]
182 pub(crate) fn has_owner_inbox_for_test(&self) -> bool {
183 self.owner_inbox().is_some()
184 }
185
186 fn next_task_id(&self) -> TaskId {
187 let n = self.counter.fetch_add(1, Ordering::Relaxed);
188 format!("bg_{n}")
189 }
190
191 fn merge_ambient_parent_context(
192 &self,
193 mut parent_context: TaskParentContext,
194 ) -> TaskParentContext {
195 if parent_context.task_id.is_none()
196 && let Some(context) = current_background_task_context()
197 {
198 parent_context.task_id = Some(context.task_id);
199 }
200
201 if let Some(context) = current_tool_lineage_context() {
202 if parent_context.run_id.is_none() {
203 parent_context.run_id = Some(context.run_id);
204 }
205 if parent_context.call_id.is_none() && !context.call_id.is_empty() {
206 parent_context.call_id = Some(context.call_id);
207 }
208 if parent_context.agent_id.is_none() && !context.agent_id.is_empty() {
209 parent_context.agent_id = Some(context.agent_id);
210 }
211 }
212
213 parent_context
214 }
215
216 fn commit_meta(&self, action: BackgroundTaskStateAction) -> Result<u64, MetaCommitError> {
218 let Some(store) = self.store() else {
219 return Err(MetaCommitError::StoreUnavailable);
220 };
221
222 let mut batch = MutationBatch::new();
223 batch.update::<BackgroundTaskStateKey>(action);
224 store.commit(batch).map_err(Into::into)
225 }
226
227 fn commit_meta_or_warn(
228 &self,
229 action: BackgroundTaskStateAction,
230 operation: &'static str,
231 task_id: &str,
232 ) {
233 if let Err(error) = self.commit_meta(action) {
234 metrics::counter!(
235 "awaken_background_task_state_commit_failures_total",
236 "operation" => operation
237 )
238 .increment(1);
239 tracing::warn!(
240 operation,
241 task_id,
242 error = %error,
243 "background task metadata commit failed"
244 );
245 }
246 }
247
248 fn terminal_event(task_id: &str, result: &TaskResult) -> TaskEvent {
249 match result {
250 TaskResult::Success(val) => TaskEvent::Completed {
251 task_id: task_id.to_string(),
252 result: Some(val.clone()),
253 },
254 TaskResult::Failed(err) => TaskEvent::Failed {
255 task_id: task_id.to_string(),
256 error: err.clone(),
257 },
258 TaskResult::Cancelled => TaskEvent::Cancelled {
259 task_id: task_id.to_string(),
260 },
261 }
262 }
263
264 pub async fn spawn<F, Fut>(
273 self: &Arc<Self>,
274 owner_thread_id: &str,
275 task_type: &str,
276 name: Option<&str>,
277 description: &str,
278 parent_context: TaskParentContext,
279 task_fn: F,
280 ) -> Result<TaskId, SpawnError>
281 where
282 F: FnOnce(TaskContext) -> Fut + Send + 'static,
283 Fut: std::future::Future<Output = TaskResult> + Send + 'static,
284 {
285 let parent_context = self.merge_ambient_parent_context(parent_context);
286 if let Some(n) = name {
287 self.validate_name(n, owner_thread_id)?;
288 }
289 let task_id = self.next_task_id();
290 let (cancel_handle, cancel_token) = CancellationToken::new_pair();
291 let now = now_ms();
292
293 let ctx = TaskContext {
294 task_id: task_id.clone(),
295 cancel_token,
296 inbox: self.owner_inbox(),
297 };
298
299 let task_name = name.map(|n| n.to_string());
300
301 self.commit_meta(BackgroundTaskStateAction::Upsert(Box::new(
303 PersistedTaskMeta {
304 task_id: task_id.clone(),
305 owner_thread_id: owner_thread_id.to_string(),
306 task_type: task_type.to_string(),
307 name: task_name.clone(),
308 description: description.to_string(),
309 status: TaskStatus::Running,
310 error: None,
311 result: None,
312 created_at_ms: now,
313 completed_at_ms: None,
314 parent_context: parent_context.clone(),
315 },
316 )))
317 .map_err(SpawnError::from)?;
318
319 let manager = Arc::clone(self);
320 let tid = task_id.clone();
321 let owner_inbox = self.owner_inbox();
322 let owner = owner_thread_id.to_string();
323 let ttype = task_type.to_string();
324 let tname = task_name.clone();
325 let desc = description.to_string();
326
327 let join_handle = tokio::spawn(async move {
328 let result = scope_background_task_context(
329 BackgroundTaskExecutionContext {
330 manager: manager.clone(),
331 task_id: tid.clone(),
332 },
333 task_fn(ctx),
334 )
335 .await;
336 let completed_at = now_ms();
337
338 let (status, error, result_val) = match &result {
340 TaskResult::Success(val) => (TaskStatus::Completed, None, Some(val.clone())),
341 TaskResult::Failed(err) => (TaskStatus::Failed, Some(err.clone()), None),
342 TaskResult::Cancelled => (TaskStatus::Cancelled, None, None),
343 };
344
345 manager.commit_meta_or_warn(
346 BackgroundTaskStateAction::Upsert(Box::new(PersistedTaskMeta {
347 task_id: tid.clone(),
348 owner_thread_id: owner,
349 task_type: ttype,
350 name: tname,
351 description: desc,
352 status,
353 error,
354 result: result_val,
355 created_at_ms: now,
356 completed_at_ms: Some(completed_at),
357 parent_context,
358 })),
359 "task_completion",
360 &tid,
361 );
362
363 if let Some(ref inbox) = owner_inbox {
365 let event = Self::terminal_event(&tid, &result);
366 inbox.send(
367 serde_json::to_value(&event).expect("TaskEvent serialization is infallible"),
368 );
369 }
370 });
371
372 let handle = TaskHandle {
373 task_id: task_id.clone(),
374 owner_thread_id: owner_thread_id.to_string(),
375 cancel_handle,
376 _join_handle: join_handle,
377 agent_inbox: None,
378 };
379
380 self.handles.lock().await.insert(task_id.clone(), handle);
381 Ok(task_id)
382 }
383
384 pub async fn cancel(&self, task_id: &str) -> bool {
386 let handles = self.handles.lock().await;
387 if let Some(handle) = handles.get(task_id) {
388 if let Some(store) = self.store()
390 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
391 && let Some(meta) = snap.tasks.get(task_id)
392 && meta.status.is_terminal()
393 {
394 return false;
395 }
396 handle.cancel_handle.cancel();
397 return true;
398 }
399 false
400 }
401
402 pub async fn cancel_tree(&self, task_id: &str) -> usize {
407 let Some(task_ids) = self.task_tree_ids(task_id) else {
408 return 0;
409 };
410
411 let handles = self.handles.lock().await;
412 let store_snap = self
413 .store()
414 .and_then(|s| s.read::<BackgroundTaskStateKey>());
415 let mut count = 0;
416 for task_id in task_ids {
417 let Some(handle) = handles.get(&task_id) else {
418 continue;
419 };
420 let is_terminal = store_snap
421 .as_ref()
422 .and_then(|snap| snap.tasks.get(&task_id))
423 .map(|meta| meta.status.is_terminal())
424 .unwrap_or(false);
425 if !is_terminal {
426 handle.cancel_handle.cancel();
427 count += 1;
428 }
429 }
430 count
431 }
432
433 pub async fn cancel_all(&self, owner_thread_id: &str) -> usize {
436 let handles = self.handles.lock().await;
437 let store_snap = self
438 .store()
439 .and_then(|s| s.read::<BackgroundTaskStateKey>());
440 let mut count = 0;
441 for handle in handles.values() {
442 if handle.owner_thread_id != owner_thread_id {
443 continue;
444 }
445 let is_terminal = store_snap
446 .as_ref()
447 .and_then(|snap| snap.tasks.get(&handle.task_id))
448 .map(|m| m.status.is_terminal())
449 .unwrap_or(false);
450 if !is_terminal {
451 handle.cancel_handle.cancel();
452 count += 1;
453 }
454 }
455 count
456 }
457
458 pub async fn list(&self, owner_thread_id: &str) -> Vec<TaskSummary> {
460 if let Some(store) = self.store()
461 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
462 {
463 return snap
464 .tasks
465 .values()
466 .filter(|m| m.owner_thread_id == owner_thread_id)
467 .map(Self::meta_to_summary)
468 .collect();
469 }
470 Vec::new()
471 }
472
473 pub async fn get(&self, task_id: &str) -> Option<TaskSummary> {
475 self.store()
476 .and_then(|s| s.read::<BackgroundTaskStateKey>())
477 .and_then(|snap| snap.tasks.get(task_id).map(Self::meta_to_summary))
478 }
479
480 fn meta_to_summary(m: &PersistedTaskMeta) -> TaskSummary {
481 TaskSummary {
482 task_id: m.task_id.clone(),
483 task_type: m.task_type.clone(),
484 description: m.description.clone(),
485 status: m.status,
486 error: m.error.clone(),
487 result: m.result.clone(),
488 created_at_ms: m.created_at_ms,
489 completed_at_ms: m.completed_at_ms,
490 parent_context: m.parent_context.clone(),
491 }
492 }
493
494 pub(crate) async fn restore_for_thread(
496 &self,
497 owner_thread_id: &str,
498 snapshot: &BackgroundTaskStateSnapshot,
499 ) {
500 if let Some(store) = self.store() {
502 let existing = store.read::<BackgroundTaskStateKey>().unwrap_or_default();
504
505 for (task_id, meta) in &snapshot.tasks {
506 if existing.tasks.contains_key(task_id) {
507 continue;
508 }
509
510 if let Some(n) = task_id
512 .strip_prefix("bg_")
513 .and_then(|s| s.parse::<u64>().ok())
514 {
515 self.counter
516 .fetch_max(n.saturating_add(1), Ordering::Relaxed);
517 }
518
519 let handles = self.handles.lock().await;
520 let has_live_handle = handles.contains_key(task_id);
521 drop(handles);
522
523 let mut to_store = meta.clone();
524 to_store.owner_thread_id = owner_thread_id.to_string();
525
526 if meta.status == TaskStatus::Running && !has_live_handle {
528 to_store.status = TaskStatus::Failed;
529 to_store.error =
530 Some("task orphaned: runtime restarted while running".to_string());
531 }
532
533 self.commit_meta_or_warn(
534 BackgroundTaskStateAction::Upsert(Box::new(to_store)),
535 "restore_task_metadata",
536 task_id,
537 );
538 }
539 }
540 }
541
542 pub async fn has_running(&self, owner_thread_id: &str) -> bool {
544 if let Some(store) = self.store()
545 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
546 {
547 return snap
548 .tasks
549 .values()
550 .any(|m| m.owner_thread_id == owner_thread_id && !m.status.is_terminal());
551 }
552 self.handles
554 .lock()
555 .await
556 .values()
557 .any(|h| h.owner_thread_id == owner_thread_id)
558 }
559
560 pub async fn spawn_agent<F, Fut>(
564 self: &Arc<Self>,
565 owner_thread_id: &str,
566 name: Option<&str>,
567 description: &str,
568 parent_context: TaskParentContext,
569 task_fn: F,
570 ) -> Result<TaskId, SpawnError>
571 where
572 F: FnOnce(CancellationToken, InboxSender, crate::inbox::InboxReceiver) -> Fut
573 + Send
574 + 'static,
575 Fut: std::future::Future<Output = TaskResult> + Send + 'static,
576 {
577 self.spawn_agent_with_context(owner_thread_id, name, description, parent_context, |ctx| {
578 task_fn(ctx.cancel_token, ctx.inbox_sender, ctx.inbox_receiver)
579 })
580 .await
581 }
582
583 pub async fn spawn_agent_with_context<F, Fut>(
586 self: &Arc<Self>,
587 owner_thread_id: &str,
588 name: Option<&str>,
589 description: &str,
590 parent_context: TaskParentContext,
591 task_fn: F,
592 ) -> Result<TaskId, SpawnError>
593 where
594 F: FnOnce(AgentTaskContext) -> Fut + Send + 'static,
595 Fut: std::future::Future<Output = TaskResult> + Send + 'static,
596 {
597 let parent_context = self.merge_ambient_parent_context(parent_context);
598 if let Some(n) = name {
599 self.validate_name(n, owner_thread_id)?;
600 }
601 let task_id = self.next_task_id();
602 let (cancel_handle, cancel_token) = CancellationToken::new_pair();
603 let now = now_ms();
604
605 let (child_inbox_tx, child_inbox_rx) = crate::inbox::inbox_channel();
606 let stored_sender = child_inbox_tx.clone();
607
608 let task_name = name.map(|n| n.to_string());
609
610 self.commit_meta(BackgroundTaskStateAction::Upsert(Box::new(
612 PersistedTaskMeta {
613 task_id: task_id.clone(),
614 owner_thread_id: owner_thread_id.to_string(),
615 task_type: "sub_agent".to_string(),
616 name: task_name.clone(),
617 description: description.to_string(),
618 status: TaskStatus::Running,
619 error: None,
620 result: None,
621 created_at_ms: now,
622 completed_at_ms: None,
623 parent_context: parent_context.clone(),
624 },
625 )))
626 .map_err(SpawnError::from)?;
627
628 let manager = Arc::clone(self);
629 let tid = task_id.clone();
630 let owner_inbox = self.owner_inbox();
631 let owner = owner_thread_id.to_string();
632 let tname = task_name.clone();
633 let desc = description.to_string();
634
635 let join_handle = tokio::spawn(async move {
636 let result = scope_background_task_context(
637 BackgroundTaskExecutionContext {
638 manager: manager.clone(),
639 task_id: tid.clone(),
640 },
641 task_fn(AgentTaskContext {
642 task_id: tid.clone(),
643 cancel_token,
644 inbox_sender: child_inbox_tx,
645 inbox_receiver: child_inbox_rx,
646 }),
647 )
648 .await;
649 let completed_at = now_ms();
650
651 let (status, error, result_val) = match &result {
652 TaskResult::Success(val) => (TaskStatus::Completed, None, Some(val.clone())),
653 TaskResult::Failed(err) => (TaskStatus::Failed, Some(err.clone()), None),
654 TaskResult::Cancelled => (TaskStatus::Cancelled, None, None),
655 };
656
657 manager.commit_meta_or_warn(
658 BackgroundTaskStateAction::Upsert(Box::new(PersistedTaskMeta {
659 task_id: tid.clone(),
660 owner_thread_id: owner,
661 task_type: "sub_agent".to_string(),
662 name: tname,
663 description: desc,
664 status,
665 error,
666 result: result_val,
667 created_at_ms: now,
668 completed_at_ms: Some(completed_at),
669 parent_context,
670 })),
671 "sub_agent_completion",
672 &tid,
673 );
674
675 let event = Self::terminal_event(&tid, &result);
676 if let Some(ref inbox) = owner_inbox {
677 inbox.send(
678 serde_json::to_value(&event).expect("TaskEvent serialization is infallible"),
679 );
680 }
681 });
682
683 let handle = TaskHandle {
684 task_id: task_id.clone(),
685 owner_thread_id: owner_thread_id.to_string(),
686 cancel_handle,
687 _join_handle: join_handle,
688 agent_inbox: Some(stored_sender),
689 };
690
691 self.handles.lock().await.insert(task_id.clone(), handle);
692 Ok(task_id)
693 }
694
695 pub async fn send_task_inbox_message(
701 &self,
702 task_id: &str,
703 owner_thread_id: &str,
704 sender_agent_id: &str,
705 content: &str,
706 ) -> Result<(), SendError> {
707 let handles = self.handles.lock().await;
708 let handle = handles.get(task_id).ok_or(SendError::TaskNotFound)?;
709
710 if handle.owner_thread_id != owner_thread_id {
712 return Err(SendError::NotOwner);
713 }
714
715 if let Some(store) = self.store()
717 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
718 && let Some(meta) = snap.tasks.get(task_id)
719 && meta.status.is_terminal()
720 {
721 return Err(SendError::TaskTerminated(meta.status));
722 }
723
724 let inbox = handle.agent_inbox.as_ref().ok_or(SendError::NoInbox)?;
725
726 let event = TaskEvent::Custom {
727 task_id: task_id.to_string(),
728 event_type: "agent_message".to_string(),
729 payload: serde_json::json!({
730 "from": sender_agent_id,
731 "content": content,
732 }),
733 };
734
735 if inbox.send(serde_json::to_value(&event).expect("TaskEvent serialization is infallible"))
736 {
737 Ok(())
738 } else {
739 Err(SendError::InboxClosed)
740 }
741 }
742
743 pub(crate) fn task_tree_ids(&self, task_id: &str) -> Option<Vec<TaskId>> {
744 let snapshot = self
745 .store()
746 .and_then(|store| store.read::<BackgroundTaskStateKey>())?;
747 if !snapshot.tasks.contains_key(task_id) {
748 return None;
749 }
750
751 let mut ordered = Vec::new();
752 let mut stack = vec![task_id.to_string()];
753 while let Some(current) = stack.pop() {
754 if ordered.iter().any(|seen| seen == ¤t) {
755 continue;
756 }
757 ordered.push(current.clone());
758 for meta in snapshot.tasks.values() {
759 if meta.parent_context.task_id.as_deref() == Some(current.as_str()) {
760 stack.push(meta.task_id.clone());
761 }
762 }
763 }
764 Some(ordered)
765 }
766
767 pub(crate) fn resolve_live_child_task(
768 &self,
769 parent_task_id: &str,
770 name_or_task_id: &str,
771 ) -> Option<TaskId> {
772 let snapshot = self.store()?.read::<BackgroundTaskStateKey>()?;
773 for meta in snapshot.tasks.values() {
774 if meta.status.is_terminal() {
775 continue;
776 }
777 if meta.parent_context.task_id.as_deref() != Some(parent_task_id) {
778 continue;
779 }
780 if meta.task_id == name_or_task_id || meta.name.as_deref() == Some(name_or_task_id) {
781 return Some(meta.task_id.clone());
782 }
783 }
784 None
785 }
786
787 pub(crate) fn resolve_live_child_run(
788 &self,
789 parent_run_id: &str,
790 name_or_task_id: &str,
791 ) -> Option<TaskId> {
792 let snapshot = self.store()?.read::<BackgroundTaskStateKey>()?;
793 for meta in snapshot.tasks.values() {
794 if meta.status.is_terminal() {
795 continue;
796 }
797 if meta.parent_context.run_id.as_deref() != Some(parent_run_id)
798 || meta.parent_context.task_id.is_some()
799 {
800 continue;
801 }
802 if meta.task_id == name_or_task_id || meta.name.as_deref() == Some(name_or_task_id) {
803 return Some(meta.task_id.clone());
804 }
805 }
806 None
807 }
808
809 pub async fn cancel_descendants_for_run(&self, parent_run_id: &str) -> usize {
810 let root_task_ids = self
811 .store()
812 .and_then(|store| store.read::<BackgroundTaskStateKey>())
813 .map(|snapshot| {
814 snapshot
815 .tasks
816 .values()
817 .filter(|meta| {
818 !meta.status.is_terminal()
819 && meta.parent_context.run_id.as_deref() == Some(parent_run_id)
820 && meta.parent_context.task_id.is_none()
821 })
822 .map(|meta| meta.task_id.clone())
823 .collect::<Vec<_>>()
824 })
825 .unwrap_or_default();
826
827 let mut cancelled = 0usize;
828 for task_id in root_task_ids {
829 cancelled += self.cancel_tree(&task_id).await;
830 }
831 cancelled
832 }
833
834 #[cfg(test)]
835 pub(crate) async fn persisted_snapshot(&self) -> HashMap<TaskId, PersistedTaskMeta> {
836 if let Some(store) = self.store()
837 && let Some(snap) = store.read::<BackgroundTaskStateKey>()
838 {
839 return snap.tasks;
840 }
841 HashMap::new()
842 }
843}
844
845impl Default for BackgroundTaskManager {
846 fn default() -> Self {
847 Self::new()
848 }
849}
850
851use awaken_contract::now_ms;