sayiir_core/task_index.rs
1//! O(1) `TaskId → metadata` lookups built once per `Workflow`.
2//!
3//! `find_task` and the metadata getters on [`WorkflowContinuation`] walk the
4//! continuation tree and recompute `TaskId::from(id.as_str())` (SHA-256) for
5//! every visited node. That's fine for occasional introspection, but it adds
6//! up on the dispatch hot path — every task claim does a `build_task_metadata`
7//! plus optional `get_task_timeout` lookup.
8//!
9//! [`TaskIndex`] flattens the tree once at workflow build time into a
10//! `HashMap<TaskId, TaskNodeMetadata>` so the per-dispatch lookups become a
11//! single hash-map probe.
12//!
13//! The index lives behind an `Arc` on [`Workflow`](crate::workflow::Workflow)
14//! and is shared cheaply with anything that needs the same view (the worker's
15//! `ExternalWorkflow`, the distributed runner's hot loop, etc.).
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use crate::TaskId;
21use crate::task::RetryPolicy;
22use crate::workflow::WorkflowContinuation;
23
24/// Metadata captured per `Task` node, keyed by [`TaskId`] in a [`TaskIndex`].
25#[derive(Debug, Clone, Default)]
26pub struct TaskNodeMetadata {
27 /// Human-readable node id (the `id` argument given to the builder).
28 ///
29 /// `Arc<str>` so the FFI layer can clone it cheaply into
30 /// `TaskExecutionContext` without re-allocating per dispatch.
31 pub(crate) name: Arc<str>,
32 pub(crate) timeout: Option<std::time::Duration>,
33 pub(crate) priority: Option<u8>,
34 pub(crate) tags: Vec<String>,
35 pub(crate) retry_policy: Option<RetryPolicy>,
36 pub(crate) version: Option<String>,
37}
38
39impl TaskNodeMetadata {
40 /// Human-readable node id (`Arc<str>` for cheap FFI cloning).
41 #[must_use]
42 pub fn name(&self) -> &Arc<str> {
43 &self.name
44 }
45 /// Configured timeout, if any.
46 #[must_use]
47 pub fn timeout(&self) -> Option<std::time::Duration> {
48 self.timeout
49 }
50 /// Configured priority (1–5; lower runs first).
51 #[must_use]
52 pub fn priority(&self) -> Option<u8> {
53 self.priority
54 }
55 /// Affinity tags for worker-pool routing.
56 #[must_use]
57 pub fn tags(&self) -> &[String] {
58 &self.tags
59 }
60 /// Configured retry policy.
61 #[must_use]
62 pub fn retry_policy(&self) -> Option<&RetryPolicy> {
63 self.retry_policy.as_ref()
64 }
65 /// Optional version string.
66 #[must_use]
67 pub fn version(&self) -> Option<&str> {
68 self.version.as_deref()
69 }
70}
71
72/// O(1) lookup table from [`TaskId`] to per-node metadata.
73///
74/// Built once via [`TaskIndex::build`] at workflow construction. Stored behind
75/// an `Arc` on [`Workflow`](crate::workflow::Workflow) and shared with
76/// anything that needs to resolve `TaskId` → name / timeout / priority / tags
77/// on the dispatch hot path.
78#[derive(Debug, Clone, Default)]
79pub struct TaskIndex(HashMap<TaskId, TaskNodeMetadata>);
80
81impl TaskIndex {
82 /// Build the index by walking `continuation` once and hashing each `Task`
83 /// node's `id` to its [`TaskId`].
84 #[must_use]
85 pub fn build(continuation: &WorkflowContinuation) -> Self {
86 // Single traversal via `iter_nodes` — keeps tree-walking logic in one
87 // place so a new continuation variant only needs to teach `NodeIter`
88 // about itself, not a parallel walker. We index every id-carrying
89 // node (Task / Delay / AwaitSignal) so worker-side `contains` lookups
90 // succeed for delay and signal positions too.
91 let map: HashMap<TaskId, TaskNodeMetadata> = continuation
92 .iter_nodes()
93 .filter(|n| {
94 matches!(
95 n.kind,
96 crate::workflow::NodeKind::Task
97 | crate::workflow::NodeKind::Delay
98 | crate::workflow::NodeKind::AwaitSignal
99 )
100 })
101 .map(|n| {
102 let metadata = TaskNodeMetadata {
103 name: Arc::from(n.id),
104 timeout: n
105 .timeout
106 .filter(|_| n.kind == crate::workflow::NodeKind::Task),
107 priority: n.priority,
108 tags: n.tags.to_vec(),
109 retry_policy: n.retry_policy.cloned(),
110 version: n.version.map(str::to_owned),
111 };
112 (TaskId::from(n.id), metadata)
113 })
114 .collect();
115 Self(map)
116 }
117
118 /// `true` if the index knows about `task_id`.
119 #[must_use]
120 pub fn contains(&self, task_id: &TaskId) -> bool {
121 self.0.contains_key(task_id)
122 }
123
124 /// Borrow the metadata for `task_id`.
125 #[must_use]
126 pub fn get(&self, task_id: &TaskId) -> Option<&TaskNodeMetadata> {
127 self.0.get(task_id)
128 }
129
130 /// Human-readable node id for `task_id`, if any.
131 #[must_use]
132 pub fn name(&self, task_id: &TaskId) -> Option<&Arc<str>> {
133 self.0.get(task_id).map(|m| &m.name)
134 }
135
136 /// Look up the priority configured on `task_id`.
137 #[must_use]
138 pub fn priority(&self, task_id: &TaskId) -> Option<u8> {
139 self.0.get(task_id).and_then(|m| m.priority)
140 }
141
142 /// Look up the timeout configured on `task_id`.
143 #[must_use]
144 pub fn timeout(&self, task_id: &TaskId) -> Option<std::time::Duration> {
145 self.0.get(task_id).and_then(|m| m.timeout)
146 }
147
148 /// Look up the affinity tags configured on `task_id`.
149 ///
150 /// Returns a borrow into the index — callers that need to own the tags
151 /// clone explicitly (e.g. when feeding into `TaskHint::new`).
152 #[must_use]
153 pub fn tags(&self, task_id: &TaskId) -> &[String] {
154 self.0.get(task_id).map_or(&[], |m| m.tags.as_slice())
155 }
156
157 /// Look up the retry policy configured on `task_id`.
158 #[must_use]
159 pub fn retry_policy(&self, task_id: &TaskId) -> Option<&RetryPolicy> {
160 self.0.get(task_id).and_then(|m| m.retry_policy.as_ref())
161 }
162
163 /// Build a [`TaskMetadata`](crate::task::TaskMetadata) from the indexed
164 /// fields. Equivalent to
165 /// [`WorkflowContinuation::build_task_metadata`](crate::workflow::WorkflowContinuation::build_task_metadata)
166 /// but O(1) instead of O(N) tree walk + per-node SHA-256.
167 #[must_use]
168 pub fn build_task_metadata(&self, task_id: &TaskId) -> crate::task::TaskMetadata {
169 match self.0.get(task_id) {
170 Some(m) => crate::task::TaskMetadata::from_node_fields(
171 m.timeout,
172 m.retry_policy.clone(),
173 m.version.clone(),
174 m.priority,
175 m.tags.clone(),
176 ),
177 None => crate::task::TaskMetadata::default(),
178 }
179 }
180
181 /// Number of indexed task nodes.
182 #[must_use]
183 pub fn len(&self) -> usize {
184 self.0.len()
185 }
186
187 /// `true` when no task nodes are indexed.
188 #[must_use]
189 pub fn is_empty(&self) -> bool {
190 self.0.is_empty()
191 }
192}