Skip to main content

feldera_types/transport/
clock.rs

1use std::cmp::max;
2
3use serde::{Deserialize, Serialize};
4use utoipa::ToSchema;
5
6#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, ToSchema)]
7pub struct ClockConfig {
8    pub clock_resolution_usecs: u64,
9
10    /// Target value for `NOW()` at the worker's first emitted tick, in
11    /// milliseconds since the Unix epoch.
12    ///
13    /// Populated verbatim from `DevTweaks::now_offset` at endpoint
14    /// construction; the wall-clock delta is computed inside the
15    /// connector's worker task from a single `SystemTime::now()`
16    /// reading, so there is no drift between config construction and
17    /// the first emitted tick.  `None` means no shift is applied.
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub now_offset_ms: Option<i64>,
20
21    /// If `true`, the clock does not advance on wall-clock cadence.
22    /// `NOW()` is held at its current value and only advances when an
23    /// external caller invokes the pipeline's `POST /clock/advance`
24    /// endpoint.  Populated from `DevTweaks::now_http_driven`.
25    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
26    pub http_driven: bool,
27}
28
29impl ClockConfig {
30    pub fn clock_resolution_ms(&self) -> u64 {
31        // Refuse to set 0 clock resolution.
32        max((self.clock_resolution_usecs + 500) / 1_000, 1)
33    }
34}
35
36/// Body of `POST /clock/advance`.
37///
38/// `delta_ms` is unsigned; negative values fail JSON deserialization.
39/// `Some(0)` reads the current `NOW()` without moving it or rounding
40/// it; `Some(n)` advances by `n` ms; `None` (`null` or omitted)
41/// advances by one `clock_resolution`.  Non-zero values round up to
42/// the next `clock_resolution` boundary, so a sub-resolution delta
43/// still moves the clock by one full tick.
44#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
45pub struct ClockAdvanceRequest {
46    #[serde(default)]
47    pub delta_ms: Option<u64>,
48}
49
50/// Response of `POST /clock/advance`: the new `NOW()` value as both
51/// milliseconds since epoch (signed; pre-1970 anchors yield negative
52/// values) and an RFC 3339 string.
53#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
54pub struct ClockAdvanceResponse {
55    pub now_ms: i64,
56    pub now: String,
57}