Skip to main content

rustvello_core/
invocation.rs

1//! Invocation types for tracking task execution.
2//!
3//! Three types form a unified invocation model:
4//!
5//! - [`InvocationHandle`] — distributed execution handle (status via backends)
6//! - [`SyncInvocation`] — sync execution result (status in-memory)
7//! - [`Invocation`] — unified enum combining both
8
9use std::marker::PhantomData;
10use std::sync::Arc;
11use std::time::Duration;
12
13use serde::de::DeserializeOwned;
14
15use rustvello_proto::identifiers::InvocationId;
16use rustvello_proto::status::InvocationStatus;
17
18use crate::error::{RustvelloError, RustvelloResult};
19use crate::orchestrator::Orchestrator;
20use crate::state_backend::StateBackend;
21
22/// Handle to a submitted invocation with typed result access.
23///
24/// Mirrors pynenc's `BaseInvocation` hierarchy. Provides:
25/// - Status checking (non-blocking)
26/// - Typed result retrieval (blocking or polling)
27/// - Access to the invocation identity
28///
29/// # Example
30///
31/// ```rust,no_run
32/// use rustvello_core::invocation::InvocationHandle;
33///
34/// // After submitting a task:
35/// // let handle: InvocationHandle<i32> = app.submit(&my_task, params).await?;
36/// // let status = handle.status().await?;
37/// // let result: i32 = handle.result().await?;
38/// ```
39pub struct InvocationHandle<R: DeserializeOwned = String> {
40    invocation_id: InvocationId,
41    orchestrator: Arc<dyn Orchestrator>,
42    state_backend: Arc<dyn StateBackend>,
43    _result_type: PhantomData<R>,
44}
45
46impl<R: DeserializeOwned> InvocationHandle<R> {
47    /// Create a new handle from raw parts.
48    pub fn new(
49        invocation_id: InvocationId,
50        orchestrator: Arc<dyn Orchestrator>,
51        state_backend: Arc<dyn StateBackend>,
52    ) -> Self {
53        Self {
54            invocation_id,
55            orchestrator,
56            state_backend,
57            _result_type: PhantomData,
58        }
59    }
60
61    /// Get the invocation's unique identifier.
62    pub fn invocation_id(&self) -> &InvocationId {
63        &self.invocation_id
64    }
65
66    /// Get the current status of this invocation.
67    pub async fn status(&self) -> RustvelloResult<InvocationStatus> {
68        let record = self
69            .orchestrator
70            .get_invocation_status(&self.invocation_id)
71            .await?;
72        Ok(record.status)
73    }
74
75    /// Check if the invocation has finished (success or failure).
76    pub async fn is_done(&self) -> RustvelloResult<bool> {
77        Ok(self.status().await?.is_terminal())
78    }
79
80    /// Get the typed result of a completed invocation.
81    ///
82    /// Returns an error if the invocation is not yet complete or failed.
83    pub async fn result(&self) -> RustvelloResult<R> {
84        let status = self.status().await?;
85        match status {
86            InvocationStatus::Success => {
87                let raw = self
88                    .state_backend
89                    .get_result(&self.invocation_id)
90                    .await?
91                    .ok_or_else(|| RustvelloError::Internal {
92                        message: format!(
93                            "invocation {} has SUCCESS status but no stored result",
94                            self.invocation_id
95                        ),
96                    })?;
97                serde_json::from_str(&raw).map_err(|e| RustvelloError::Serialization {
98                    message: e.to_string(),
99                })
100            }
101            InvocationStatus::Failed => {
102                let err = self.state_backend.get_error(&self.invocation_id).await?;
103                Err(RustvelloError::runner_err(err.map_or_else(
104                    || "unknown error".to_string(),
105                    |e| e.to_string(),
106                )))
107            }
108            other => Err(RustvelloError::Internal {
109                message: format!(
110                    "invocation {} is not finished (status: {})",
111                    self.invocation_id, other
112                ),
113            }),
114        }
115    }
116
117    /// Wait for the invocation to complete, polling at the given interval.
118    ///
119    /// Returns the typed result once the invocation reaches a terminal state.
120    ///
121    /// **Note:** Uses a fixed poll interval with no backoff. For long-running
122    /// tasks, prefer a longer interval (e.g., 500ms–2s) to reduce backend load.
123    pub async fn wait(&self, poll_interval: Duration) -> RustvelloResult<R> {
124        loop {
125            if self.is_done().await? {
126                return self.result().await;
127            }
128            tokio::time::sleep(poll_interval).await;
129        }
130    }
131
132    /// Wait for the invocation to complete with a timeout.
133    ///
134    /// Returns `Err(Timeout)` if the invocation does not complete within the given duration.
135    pub async fn wait_timeout(
136        &self,
137        timeout: Duration,
138        poll_interval: Duration,
139    ) -> RustvelloResult<R> {
140        tokio::time::timeout(timeout, self.wait(poll_interval))
141            .await
142            .map_err(|_| {
143                RustvelloError::runner_err(format!(
144                    "timeout waiting for invocation {}",
145                    self.invocation_id
146                ))
147            })?
148    }
149
150    /// Erase the result type, returning a handle that yields raw JSON strings.
151    pub fn into_untyped(self) -> InvocationHandle<String> {
152        InvocationHandle {
153            invocation_id: self.invocation_id,
154            orchestrator: self.orchestrator,
155            state_backend: self.state_backend,
156            _result_type: PhantomData,
157        }
158    }
159}
160
161impl<R: DeserializeOwned> std::fmt::Debug for InvocationHandle<R> {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("InvocationHandle")
164            .field("invocation_id", &self.invocation_id)
165            .finish()
166    }
167}
168
169// ---------------------------------------------------------------------------
170// SyncInvocation — mirrors pynenc's ConcurrentInvocation
171// ---------------------------------------------------------------------------
172
173/// Synchronous invocation result — created when `dev_mode_force_sync = true`.
174///
175/// Mirrors pynenc's `ConcurrentInvocation`. The task has already been executed
176/// (with retries if configured); this wrapper provides the same interface as
177/// [`InvocationHandle`] for unified API usage via [`Invocation`].
178///
179/// No backend interaction — status and result are held in memory.
180pub struct SyncInvocation<R> {
181    invocation_id: InvocationId,
182    status: InvocationStatus,
183    result: Result<R, RustvelloError>,
184}
185
186impl<R> SyncInvocation<R> {
187    /// Create a successful sync invocation.
188    pub fn success(invocation_id: InvocationId, result: R) -> Self {
189        Self {
190            invocation_id,
191            status: InvocationStatus::Success,
192            result: Ok(result),
193        }
194    }
195
196    /// Create a failed sync invocation.
197    pub fn failed(invocation_id: InvocationId, error: RustvelloError) -> Self {
198        Self {
199            invocation_id,
200            status: InvocationStatus::Failed,
201            result: Err(error),
202        }
203    }
204
205    /// Get the invocation's unique identifier.
206    pub fn invocation_id(&self) -> &InvocationId {
207        &self.invocation_id
208    }
209
210    /// Get the status (always terminal for sync invocations).
211    pub fn status(&self) -> InvocationStatus {
212        self.status
213    }
214
215    /// Check if done (always true for sync invocations).
216    pub fn is_done(&self) -> bool {
217        true
218    }
219}
220
221impl<R> std::fmt::Debug for SyncInvocation<R> {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        f.debug_struct("SyncInvocation")
224            .field("invocation_id", &self.invocation_id)
225            .field("status", &self.status)
226            .finish()
227    }
228}
229
230// ---------------------------------------------------------------------------
231// Invocation<R> — unified enum for both execution paths
232// ---------------------------------------------------------------------------
233
234/// Unified invocation type — same API for sync and distributed execution.
235///
236/// Created by `app.call()`. The caller doesn't need to know whether the task
237/// was executed synchronously or routed through the broker.
238///
239/// Matches pynenc's pattern where `Task._call()` returns either a
240/// `ConcurrentInvocation` or `DistributedInvocation` through a common
241/// `BaseInvocation` interface.
242#[non_exhaustive]
243pub enum Invocation<R: DeserializeOwned> {
244    /// Sync execution (dev mode). Result is already computed.
245    Sync(SyncInvocation<R>),
246    /// Distributed execution. Result available after runner processes it.
247    Distributed(InvocationHandle<R>),
248}
249
250impl<R: DeserializeOwned> Invocation<R> {
251    /// Get the invocation's unique identifier.
252    pub fn invocation_id(&self) -> &InvocationId {
253        match self {
254            Self::Sync(s) => s.invocation_id(),
255            Self::Distributed(d) => d.invocation_id(),
256        }
257    }
258
259    /// Get the current status of this invocation.
260    pub async fn status(&self) -> RustvelloResult<InvocationStatus> {
261        match self {
262            Self::Sync(s) => Ok(s.status()),
263            Self::Distributed(d) => d.status().await,
264        }
265    }
266
267    /// Check if the invocation has finished (success or failure).
268    pub async fn is_done(&self) -> RustvelloResult<bool> {
269        match self {
270            Self::Sync(s) => Ok(s.is_done()),
271            Self::Distributed(d) => d.is_done().await,
272        }
273    }
274
275    /// Get the typed result.
276    ///
277    /// For sync invocations, returns immediately.
278    /// For distributed invocations, may fail if not yet complete.
279    pub async fn result(self) -> RustvelloResult<R> {
280        match self {
281            Self::Sync(s) => s.result,
282            Self::Distributed(d) => d.result().await,
283        }
284    }
285
286    /// Wait for the invocation to complete, polling at the given interval.
287    pub async fn wait(self, poll_interval: Duration) -> RustvelloResult<R> {
288        match self {
289            Self::Sync(s) => s.result,
290            Self::Distributed(d) => d.wait(poll_interval).await,
291        }
292    }
293
294    /// Wait with a timeout.
295    pub async fn wait_timeout(
296        self,
297        timeout: Duration,
298        poll_interval: Duration,
299    ) -> RustvelloResult<R> {
300        match self {
301            Self::Sync(s) => s.result,
302            Self::Distributed(d) => d.wait_timeout(timeout, poll_interval).await,
303        }
304    }
305
306    /// Returns `true` if this is a sync invocation.
307    pub fn is_sync(&self) -> bool {
308        matches!(self, Self::Sync(_))
309    }
310
311    /// Returns `true` if this is a distributed invocation.
312    pub fn is_distributed(&self) -> bool {
313        matches!(self, Self::Distributed(_))
314    }
315}
316
317impl<R: DeserializeOwned> std::fmt::Debug for Invocation<R> {
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        match self {
320            Self::Sync(s) => f.debug_tuple("Invocation::Sync").field(s).finish(),
321            Self::Distributed(d) => f.debug_tuple("Invocation::Distributed").field(d).finish(),
322        }
323    }
324}