feldera_types/transport/clock.rs
1use std::cmp::max;
2
3use serde::{Deserialize, Serialize};
4use utoipa::ToSchema;
5
6#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, ToSchema)]
7pub struct ClockConfig {
8 pub clock_resolution_usecs: u64,
9
10 /// Target value for `NOW()` at the worker's first emitted tick, in
11 /// milliseconds since the Unix epoch.
12 ///
13 /// Populated verbatim from `DevTweaks::now_offset` at endpoint
14 /// construction; the wall-clock delta is computed inside the
15 /// connector's worker task from a single `SystemTime::now()`
16 /// reading, so there is no drift between config construction and
17 /// the first emitted tick. `None` means no shift is applied.
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub now_offset_ms: Option<i64>,
20
21 /// If `true`, the clock does not advance on wall-clock cadence.
22 /// `NOW()` is held at its current value and only advances when an
23 /// external caller invokes the pipeline's `POST /clock/advance`
24 /// endpoint. Populated from `DevTweaks::now_http_driven`.
25 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
26 pub http_driven: bool,
27}
28
29impl ClockConfig {
30 pub fn clock_resolution_ms(&self) -> u64 {
31 // Refuse to set 0 clock resolution.
32 max((self.clock_resolution_usecs + 500) / 1_000, 1)
33 }
34}
35
36/// Body of `POST /clock/advance`.
37///
38/// `delta_ms` is unsigned; negative values fail JSON deserialization.
39/// `Some(0)` reads the current `NOW()` without moving it or rounding
40/// it; `Some(n)` advances by `n` ms; `None` (`null` or omitted)
41/// advances by one `clock_resolution`. Non-zero values round up to
42/// the next `clock_resolution` boundary, so a sub-resolution delta
43/// still moves the clock by one full tick.
44#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
45pub struct ClockAdvanceRequest {
46 #[serde(default)]
47 pub delta_ms: Option<u64>,
48}
49
50/// Response of `POST /clock/advance`: the new `NOW()` value as both
51/// milliseconds since epoch (signed; pre-1970 anchors yield negative
52/// values) and an RFC 3339 string.
53#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
54pub struct ClockAdvanceResponse {
55 pub now_ms: i64,
56 pub now: String,
57}