Skip to main content

feldera_types/
transaction.rs

1use std::fmt::{Display, Formatter};
2
3use serde::{Deserialize, Serialize};
4use utoipa::ToSchema;
5
6pub type TransactionId = i64;
7
8/// Response to a `/start_transaction` request.
9#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
10pub struct StartTransactionResponse {
11    pub transaction_id: i64,
12}
13
14impl StartTransactionResponse {
15    pub fn new(transaction_id: TransactionId) -> Self {
16        Self { transaction_id }
17    }
18}
19
20/// Summary of transaction commit progress.
21#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
22pub struct CommitProgressSummary {
23    /// Number of operators that have been fully flushed.
24    pub completed: u64,
25
26    /// Number of operators that are currently being flushed.
27    pub in_progress: u64,
28
29    /// Number of operators that haven't started flushing.
30    pub remaining: u64,
31
32    /// Number of records processed by operators that are currently being flushed.
33    pub in_progress_processed_records: u64,
34
35    /// Total number of records that operators that are currently being flushed need to process.
36    pub in_progress_total_records: u64,
37}
38
39impl CommitProgressSummary {
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    pub fn merge(&mut self, other: &CommitProgressSummary) {
45        self.completed += other.completed;
46        self.in_progress += other.in_progress;
47        self.remaining += other.remaining;
48        self.in_progress_processed_records += other.in_progress_processed_records;
49        self.in_progress_total_records += other.in_progress_total_records;
50    }
51}
52
53impl Display for CommitProgressSummary {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        write!(
56            f,
57            "completed: {} operators, evaluating: {} operators [{}/{} changes processed], remaining: {} operators",
58            self.completed,
59            self.in_progress,
60            self.in_progress_processed_records,
61            self.in_progress_total_records,
62            self.remaining
63        )
64    }
65}