Skip to main content

sayiir_core/
task_index.rs

1//! O(1) `TaskId → metadata` lookups built once per `Workflow`.
2//!
3//! `find_task` and the metadata getters on [`WorkflowContinuation`] walk the
4//! continuation tree and recompute `TaskId::from(id.as_str())` (SHA-256) for
5//! every visited node. That's fine for occasional introspection, but it adds
6//! up on the dispatch hot path — every task claim does a `build_task_metadata`
7//! plus optional `get_task_timeout` lookup.
8//!
9//! [`TaskIndex`] flattens the tree once at workflow build time into a
10//! `HashMap<TaskId, TaskNodeMetadata>` so the per-dispatch lookups become a
11//! single hash-map probe.
12//!
13//! The index lives behind an `Arc` on [`Workflow`](crate::workflow::Workflow)
14//! and is shared cheaply with anything that needs the same view (the worker's
15//! `ExternalWorkflow`, the distributed runner's hot loop, etc.).
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use crate::TaskId;
21use crate::task::RetryPolicy;
22use crate::workflow::WorkflowContinuation;
23
24/// Metadata captured per `Task` node, keyed by [`TaskId`] in a [`TaskIndex`].
25#[derive(Debug, Clone, Default)]
26pub struct TaskNodeMetadata {
27    /// Human-readable node id (the `id` argument given to the builder).
28    ///
29    /// `Arc<str>` so the FFI layer can clone it cheaply into
30    /// `TaskExecutionContext` without re-allocating per dispatch.
31    pub(crate) name: Arc<str>,
32    pub(crate) timeout: Option<std::time::Duration>,
33    pub(crate) priority: Option<u8>,
34    pub(crate) tags: Vec<String>,
35    pub(crate) retry_policy: Option<RetryPolicy>,
36    pub(crate) version: Option<String>,
37}
38
39impl TaskNodeMetadata {
40    /// Human-readable node id (`Arc<str>` for cheap FFI cloning).
41    #[must_use]
42    pub fn name(&self) -> &Arc<str> {
43        &self.name
44    }
45    /// Configured timeout, if any.
46    #[must_use]
47    pub fn timeout(&self) -> Option<std::time::Duration> {
48        self.timeout
49    }
50    /// Configured priority (1–5; lower runs first).
51    #[must_use]
52    pub fn priority(&self) -> Option<u8> {
53        self.priority
54    }
55    /// Affinity tags for worker-pool routing.
56    #[must_use]
57    pub fn tags(&self) -> &[String] {
58        &self.tags
59    }
60    /// Configured retry policy.
61    #[must_use]
62    pub fn retry_policy(&self) -> Option<&RetryPolicy> {
63        self.retry_policy.as_ref()
64    }
65    /// Optional version string.
66    #[must_use]
67    pub fn version(&self) -> Option<&str> {
68        self.version.as_deref()
69    }
70}
71
72/// O(1) lookup table from [`TaskId`] to per-node metadata.
73///
74/// Built once via [`TaskIndex::build`] at workflow construction. Stored behind
75/// an `Arc` on [`Workflow`](crate::workflow::Workflow) and shared with
76/// anything that needs to resolve `TaskId` → name / timeout / priority / tags
77/// on the dispatch hot path.
78#[derive(Debug, Clone, Default)]
79pub struct TaskIndex(HashMap<TaskId, TaskNodeMetadata>);
80
81impl TaskIndex {
82    /// Build the index by walking `continuation` once and hashing each `Task`
83    /// node's `id` to its [`TaskId`].
84    #[must_use]
85    pub fn build(continuation: &WorkflowContinuation) -> Self {
86        // Single traversal via `iter_nodes` — keeps tree-walking logic in one
87        // place so a new continuation variant only needs to teach `NodeIter`
88        // about itself, not a parallel walker. We index every id-carrying
89        // node (Task / Delay / AwaitSignal) so worker-side `contains` lookups
90        // succeed for delay and signal positions too.
91        let map: HashMap<TaskId, TaskNodeMetadata> = continuation
92            .iter_nodes()
93            .filter(|n| {
94                matches!(
95                    n.kind,
96                    crate::workflow::NodeKind::Task
97                        | crate::workflow::NodeKind::Delay
98                        | crate::workflow::NodeKind::AwaitSignal
99                )
100            })
101            .map(|n| {
102                let metadata = TaskNodeMetadata {
103                    name: Arc::from(n.id),
104                    timeout: n
105                        .timeout
106                        .filter(|_| n.kind == crate::workflow::NodeKind::Task),
107                    priority: n.priority,
108                    tags: n.tags.to_vec(),
109                    retry_policy: n.retry_policy.cloned(),
110                    version: n.version.map(str::to_owned),
111                };
112                (TaskId::from(n.id), metadata)
113            })
114            .collect();
115        Self(map)
116    }
117
118    /// `true` if the index knows about `task_id`.
119    #[must_use]
120    pub fn contains(&self, task_id: &TaskId) -> bool {
121        self.0.contains_key(task_id)
122    }
123
124    /// Borrow the metadata for `task_id`.
125    #[must_use]
126    pub fn get(&self, task_id: &TaskId) -> Option<&TaskNodeMetadata> {
127        self.0.get(task_id)
128    }
129
130    /// Human-readable node id for `task_id`, if any.
131    #[must_use]
132    pub fn name(&self, task_id: &TaskId) -> Option<&Arc<str>> {
133        self.0.get(task_id).map(|m| &m.name)
134    }
135
136    /// Look up the priority configured on `task_id`.
137    #[must_use]
138    pub fn priority(&self, task_id: &TaskId) -> Option<u8> {
139        self.0.get(task_id).and_then(|m| m.priority)
140    }
141
142    /// Look up the timeout configured on `task_id`.
143    #[must_use]
144    pub fn timeout(&self, task_id: &TaskId) -> Option<std::time::Duration> {
145        self.0.get(task_id).and_then(|m| m.timeout)
146    }
147
148    /// Look up the affinity tags configured on `task_id`.
149    ///
150    /// Returns a borrow into the index — callers that need to own the tags
151    /// clone explicitly (e.g. when feeding into `TaskHint::new`).
152    #[must_use]
153    pub fn tags(&self, task_id: &TaskId) -> &[String] {
154        self.0.get(task_id).map_or(&[], |m| m.tags.as_slice())
155    }
156
157    /// Look up the retry policy configured on `task_id`.
158    #[must_use]
159    pub fn retry_policy(&self, task_id: &TaskId) -> Option<&RetryPolicy> {
160        self.0.get(task_id).and_then(|m| m.retry_policy.as_ref())
161    }
162
163    /// Build a [`TaskMetadata`](crate::task::TaskMetadata) from the indexed
164    /// fields. Equivalent to
165    /// [`WorkflowContinuation::build_task_metadata`](crate::workflow::WorkflowContinuation::build_task_metadata)
166    /// but O(1) instead of O(N) tree walk + per-node SHA-256.
167    #[must_use]
168    pub fn build_task_metadata(&self, task_id: &TaskId) -> crate::task::TaskMetadata {
169        match self.0.get(task_id) {
170            Some(m) => crate::task::TaskMetadata::from_node_fields(
171                m.timeout,
172                m.retry_policy.clone(),
173                m.version.clone(),
174                m.priority,
175                m.tags.clone(),
176            ),
177            None => crate::task::TaskMetadata::default(),
178        }
179    }
180
181    /// Number of indexed task nodes.
182    #[must_use]
183    pub fn len(&self) -> usize {
184        self.0.len()
185    }
186
187    /// `true` when no task nodes are indexed.
188    #[must_use]
189    pub fn is_empty(&self) -> bool {
190        self.0.is_empty()
191    }
192}