1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.
//! Persistence for watch resume state.
//!
//! [`StateStore`] is an async trait for persisting and retrieving
//! [`Checkpoint`]s keyed by [`ResumeKey`], consumed by the aviso
//! watch supervisor for at-least-once delivery across reconnects and
//! process restarts. Implementations are `Send + Sync` and serialise
//! concurrent writes internally so the in-memory and (where
//! applicable) on-disk states stay consistent.
//!
//! A successful [`StateStore::put`] is committed-before-visible: a
//! subsequent [`StateStore::get`] returns the resolved value. For
//! durable implementations (such as [`JsonFileStore`]) the same
//! guarantee is durable-before-visible: the disk write returns
//! success before the new value becomes visible to readers. A failed
//! `put` leaves all state unchanged.
//!
//! `put` is **strictly monotonic** in `last_committed_sequence`: a
//! put with a sequence less than or equal to the existing value is a
//! silent no-op. Both implementations enforce this so consumers see
//! identical behaviour regardless of the backing store. Callers
//! needing to reset a checkpoint to a lower sequence must
//! [`delete`](StateStore::delete) first, then `put`.
//!
//! Two implementations are provided:
//!
//! - [`MemoryStore`]: in-process. State dies with the program. Good
//! for tests and short-lived consumers.
//! - [`JsonFileStore`]: backed by a JSON file with crash-safe atomic
//! writes via the `atomicwrites` crate plus a cross-process
//! advisory lock on a sidecar lockfile. Safe for cooperating
//! processes on local filesystems; see the type docs for the
//! precondition list (no NFS/CIFS, no external deletion of the
//! lockfile).
//!
//! Resume keys are derived from a base URL, an event type, the watch
//! filter body, and an optional schema fingerprint (D3 in the ADR log).
//! The hash deliberately excludes any server-side resume position; the
//! same logical subscription always computes the same key.
pub use Checkpoint;
pub use StoreError;
pub use JsonFileStore;
pub use MemoryStore;
pub use ;
use async_trait;
/// Persistent storage for [`Checkpoint`] keyed by [`ResumeKey`].
///
/// Implementations are `Send + Sync` and serialise concurrent writes
/// internally. See the [module docs](self) for the strict-monotonic
/// `put` contract and the multi-writer concurrency notes that
/// distinguish [`MemoryStore`] from [`JsonFileStore`].
///
/// # Cancel-safety expectation for watch supervisor consumers
///
/// The watch supervisor in [`crate::AvisoClient::watch`] consumes this
/// trait with an asymmetric cancel-safety contract:
///
/// - [`Self::get`] MAY be cancelled (the future MAY be dropped
/// mid-flight). The supervisor races the initial cursor-load `get`
/// against per-stream drop and parent drop via `tokio::select!`, so
/// a long-running `get` that overlaps a drop terminates promptly.
/// Implementations must be safe to drop mid-`await`: dropping the
/// future must not corrupt internal state, leak resources beyond
/// what `Drop` cleans up, or break invariants for the next `get` or
/// `put` against the same key. Pure I/O-bound implementations
/// (memory map, file read, network round trip) usually satisfy this
/// trivially; implementations that hold a partially-completed
/// internal transaction across the `await` boundary must release it
/// on drop.
///
/// - [`Self::put`] is NOT cancelled. The supervisor lets in-progress
/// puts run to completion so the underlying durable state is never
/// left half-written. Implementations may rely on atomic completion
/// within their own `await` lifetime. The trade-off is bounded extra
/// exit latency proportional to the `put` duration on parent drop.
///
/// Implementations SHOULD keep both methods bounded in wall-clock time
/// (the shipped [`JsonFileStore`] is dominated by `fsync`, typically
/// tens of milliseconds on local disk). A custom store that performs
/// an unbounded network call from within `put` will extend
/// `AvisoClient::Drop` latency by that amount; if that is
/// unacceptable, the implementation may internally apply its own
/// timeout and return [`StoreError`] on expiry. The supervisor will
/// surface that timeout as [`crate::ClientError::StateStore`] and
/// terminate the watch.