1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.
//! Watch state machine.
//!
//! This module contains the pure-logic state machine that drives a
//! watch session, as specified by ADRs D2 and D15. The state is the
//! orthogonal product of two axes,
//! [`ReplayPhase`] x [`ConnectionStatus`], and is advanced through a
//! single reducer. The reducer owns no I/O, no async runtime, and no
//! checkpoint state; downstream watch supervisors wire it to the
//! HTTP/SSE transport, the auth provider, and the [`crate::state`]
//! store.
//!
//! The public surface is accessible as `aviso::watch::*`. The
//! [`crate::AvisoClient::watch`] method returns a [`NotificationStream`]
//! whose supervisor task drives this reducer; the same supervisor also
//! backs [`crate::AvisoClient::watch_with_handler`].
//!
//! # Concepts
//!
//! - [`ReplayPhase`]: where the stream is in its replay-or-live
//! lifecycle. Starts in `Replaying { start, replay_completed: false }`
//! when constructed with a resume position, or in `Live` when
//! constructed without one.
//! - [`ConnectionStatus`]: the transport-level state of the SSE
//! connection (connected, reconnecting, waiting on backoff, refreshing
//! auth).
//! - [`WatchMode`]: whether the session is `Watch` (historical-then-live;
//! reconnects on `end_of_stream`) or `ReplayOnly` (terminates on
//! `end_of_stream` once the server's `replay_completed` event has been
//! received).
//! - [`WatchEvent`]: the input vocabulary the supervisor uses to drive
//! the reducer.
//! - [`WatchOutcome`]: the reducer's reply, telling the supervisor what
//! to do next (continue, reconnect with a [`ReconnectPolicy`], refresh
//! auth, surface a gap, or stop).
//!
//! # Transition summary
//!
//! The full transition table (one row per event x precondition pair)
//! lives in the commit message that introduced
//! `crates/aviso/src/watch/state.rs`. The summary below is the
//! reader-friendly view; the row numbers in `state.rs`'s match arms
//! refer to the same table.
//!
//! - `ConnectionEstablished`: `connection_status` -> `Connected`.
//! - `ConnectionLost { .. }` or `HeartbeatStarvation`: reconnect with
//! [`ReconnectPolicy::ExponentialBackoff`].
//! - `ServerClose { reason }`: dispatches on the reason.
//! `MaxDurationReached` reconnects immediately;
//! `ServerShutdown` applies a short backoff;
//! `EndOfStream` in [`WatchMode::Watch`] reconnects immediately;
//! `EndOfStream` in [`WatchMode::ReplayOnly`] terminates iff
//! `replay_completed` was already true, otherwise reconnects.
//! - `BackoffStarted(d)` records the duration in
//! [`ConnectionStatus::BackoffWait`]; `BackoffElapsed` returns the
//! reducer to `Reconnecting` only if currently waiting.
//! - `AuthRejected`: `connection_status` -> `RefreshingAuth`; outcome
//! [`WatchOutcome::RefreshAuth`].
//! `AuthRefreshCompleted { success: true }`: `connection_status` ->
//! `Reconnecting`.
//! `AuthRefreshCompleted { success: false }`: terminate with
//! [`FatalKind::AuthenticationRejectedAfterRefresh`].
//! - `HeartbeatReceived`, `NotificationReceived { .. }`: pure
//! observations, no state change.
//! - `ReplayCompleted`: in [`WatchMode::Watch`] from `Replaying` moves
//! to `Live`. In [`WatchMode::ReplayOnly`] from `Replaying { rc:
//! false }` flips `replay_completed` to true. Idempotent in every
//! other non-terminal phase.
//! - `GapDetected(reason)`: phase -> `GapDetected { reason }`; outcome
//! [`WatchOutcome::Gap`] (dedicated, not folded into `Continue`).
//! - `Fatal(kind)`: phase -> `Closed { Fatal { kind } }`; outcome
//! `Stop`.
//! - `Stop`: phase -> `Closed { UserRequested }`; outcome `Stop`.
//! - `Closed` is sticky: every subsequent event is a no-op returning
//! `Continue`.
//!
//! # Usage
//!
//! ```
//! use aviso::watch::{
//! ReconnectPolicy, ServerCloseReason, WatchEvent, WatchOutcome,
//! WatchState,
//! };
//!
//! // A live-only watch (no replay backlog) starts directly in
//! // `Live` with `Reconnecting` while the transport opens.
//! let mut state = WatchState::watch(None);
//!
//! // Supervisor establishes the transport.
//! let _ = state.transition(WatchEvent::ConnectionEstablished);
//!
//! // Server hits its `connection_max_duration_sec`. Routine close;
//! // reconnect immediately, no backoff.
//! let outcome = state.transition(WatchEvent::ServerClose {
//! reason: ServerCloseReason::MaxDurationReached,
//! });
//! assert_eq!(
//! outcome,
//! WatchOutcome::Reconnect {
//! policy: ReconnectPolicy::Immediate,
//! }
//! );
//!
//! // User asks the watch to stop.
//! let _ = state.transition(WatchEvent::Stop);
//! assert!(state.is_terminal());
//! ```
//!
//! # Cross-references
//!
//! - D2 (`plans/decisions.md`): reconnect-as-norm,
//! at-least-once delivery, state-machine sketch, reconnect classifier.
//! - D15: state machine is the orthogonal product `ReplayPhase x
//! ConnectionStatus`, single reducer, fields private.
//! - D9: a malformed `CloudEvent` id is terminal; surfaces here through
//! [`WatchEvent::Fatal`] with [`FatalKind::MalformedEvent`].
//! - D17: `from_date` is bootstrap-only and converts to a sequence
//! cursor after the first commit. The reducer does not own that
//! bookkeeping; the supervisor advances its own checkpoint state in
//! response to [`WatchEvent::NotificationReceived`].
pub use ;
pub use ;
pub use WatchMode;
pub use ;
pub use ;
pub use WatchRequest;
pub use WatchState;
pub use NotificationStream;
pub use CommandTriggerConfig;
pub use ;
pub use ;
pub use WireWatchRequest;