feldera_types/completion_token.rs
1use serde::{Deserialize, Serialize};
2use utoipa::ToSchema;
3
4use crate::coordination::Step;
5
6/// Response to a completion token creation request.
7#[derive(Debug, Serialize, Deserialize, ToSchema)]
8pub struct CompletionTokenResponse {
9 /// Completion token.
10 ///
11 /// An opaque string associated with the current position in the input stream
12 /// generated by an input connector.
13 /// Pass this string to the `/completion_status` endpoint to check whether all
14 /// inputs associated with the token have been fully processed by the pipeline.
15 pub token: String,
16}
17
18impl CompletionTokenResponse {
19 pub fn new(token: String) -> Self {
20 Self { token }
21 }
22}
23
24/// URL-encoded arguments to the `/completion_status` endpoint.
25#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, ToSchema)]
26pub struct CompletionStatusArgs {
27 /// Completion token returned by the `/completion_token` or `/ingress`
28 /// endpoint.
29 pub token: String,
30}
31
32/// Completion token status returned by the `/completion_status` endpoint.
33#[derive(Debug, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
34pub enum CompletionStatus {
35 /// All inputs associated with the token have been processed to completion.
36 #[serde(rename = "complete")]
37 Complete,
38
39 /// The pipeline is still processing input updates associated with the token.
40 #[serde(rename = "inprogress")]
41 InProgress,
42}
43
44/// Response to a completion token status request.
45#[derive(Debug, Serialize, Deserialize, ToSchema)]
46pub struct CompletionStatusResponse {
47 /// Completion token status.
48 pub status: CompletionStatus,
49
50 /// If all of the data associated with the token has been processed through
51 /// the pipeline, this is the final step that includes at least one record.
52 /// When the pipeline's `total_completed_steps` reaches this value, the
53 /// token has been completed.
54 ///
55 /// This is `None` before the data associated with the token has been
56 /// processed through the pipeline.
57 #[schema(value_type = Option<u64>)]
58 pub step: Option<Step>,
59}
60
61impl CompletionStatusResponse {
62 pub fn new(step: Option<Step>, total_completed_steps: Step) -> Self {
63 let status = if let Some(step) = step
64 && total_completed_steps >= step
65 {
66 CompletionStatus::Complete
67 } else {
68 CompletionStatus::InProgress
69 };
70 Self { step, status }
71 }
72}