Skip to main content

feldera_types/
completion_token.rs

1use serde::{Deserialize, Serialize};
2use utoipa::ToSchema;
3
4use crate::coordination::Step;
5
6/// Response to a completion token creation request.
7#[derive(Debug, Serialize, Deserialize, ToSchema)]
8pub struct CompletionTokenResponse {
9    /// Completion token.
10    ///
11    /// An opaque string associated with the current position in the input stream
12    /// generated by an input connector.
13    /// Pass this string to the `/completion_status` endpoint to check whether all
14    /// inputs associated with the token have been fully processed by the pipeline.
15    pub token: String,
16}
17
18impl CompletionTokenResponse {
19    pub fn new(token: String) -> Self {
20        Self { token }
21    }
22}
23
24/// URL-encoded arguments to the `/completion_status` endpoint.
25#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, ToSchema)]
26pub struct CompletionStatusArgs {
27    /// Completion token returned by the `/completion_token` or `/ingress`
28    /// endpoint.
29    pub token: String,
30}
31
32/// Completion token status returned by the `/completion_status` endpoint.
33#[derive(Debug, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
34pub enum CompletionStatus {
35    /// All inputs associated with the token have been processed to completion.
36    #[serde(rename = "complete")]
37    Complete,
38
39    /// The pipeline is still processing input updates associated with the token.
40    #[serde(rename = "inprogress")]
41    InProgress,
42}
43
44/// Response to a completion token status request.
45#[derive(Debug, Serialize, Deserialize, ToSchema)]
46pub struct CompletionStatusResponse {
47    /// Completion token status.
48    pub status: CompletionStatus,
49
50    /// If all of the data associated with the token has been processed through
51    /// the pipeline, this is the final step that includes at least one record.
52    /// When the pipeline's `total_completed_steps` reaches this value, the
53    /// token has been completed.
54    ///
55    /// This is `None` before the data associated with the token has been
56    /// processed through the pipeline.
57    #[schema(value_type = Option<u64>)]
58    pub step: Option<Step>,
59}
60
61impl CompletionStatusResponse {
62    pub fn new(step: Option<Step>, total_completed_steps: Step) -> Self {
63        let status = if let Some(step) = step
64            && total_completed_steps >= step
65        {
66            CompletionStatus::Complete
67        } else {
68            CompletionStatus::InProgress
69        };
70        Self { step, status }
71    }
72}