koprs_external/watcher.rs
1use std::fmt;
2use std::time::Duration;
3
4use futures::future::BoxFuture;
5use tokio::sync::mpsc;
6use tokio::task::JoinHandle;
7use tokio::time::sleep;
8use tracing::{info, warn};
9
10use crate::error::Result;
11
12// ---------------------------------------------------------------------------
13// ExternalEvent
14// ---------------------------------------------------------------------------
15
16/// A change event produced by an [`ExternalSource`] on each poll.
17///
18/// The source implementation is responsible for tracking state between calls
19/// and classifying events correctly.
20#[derive(Debug, Clone)]
21pub enum ExternalEvent<T> {
22 /// An item was observed for the first time.
23 Added(T),
24 /// An item that was previously observed has changed.
25 Modified(T),
26 /// An item that was previously observed is no longer present.
27 Removed(T),
28}
29
30// ---------------------------------------------------------------------------
31// ExternalSource
32// ---------------------------------------------------------------------------
33
34/// A source that can be polled for changes.
35///
36/// Implementations maintain their own state between calls so they can
37/// distinguish [`ExternalEvent::Added`], [`ExternalEvent::Modified`], and
38/// [`ExternalEvent::Removed`] events.
39///
40/// The returned [`BoxFuture`] must be `Send` — all built-in sources satisfy
41/// this requirement.
42///
43/// # Examples
44///
45/// ```
46/// use futures::future::BoxFuture;
47/// use koprs_external::error::Result;
48/// use koprs_external::watcher::{ExternalEvent, ExternalSource};
49///
50/// struct AlwaysTicks;
51///
52/// impl ExternalSource for AlwaysTicks {
53/// type Item = String;
54///
55/// fn poll(&mut self) -> BoxFuture<'_, Result<Vec<ExternalEvent<String>>>> {
56/// Box::pin(async move {
57/// Ok(vec![ExternalEvent::Added("tick".to_string())])
58/// })
59/// }
60///
61/// fn name(&self) -> &str { "always-ticks" }
62/// }
63/// ```
64pub trait ExternalSource: Send + 'static {
65 /// The item type produced by this source.
66 type Item: Send + Clone + fmt::Debug + 'static;
67
68 /// Poll the external source and return any change events since the last
69 /// call.
70 ///
71 /// Implementations track state internally so that repeated calls to
72 /// `poll` produce accurate `Added`, `Modified`, and `Removed` events
73 /// without requiring the caller to diff results.
74 fn poll(&mut self) -> BoxFuture<'_, Result<Vec<ExternalEvent<Self::Item>>>>;
75
76 /// A human-readable identifier for this source, used in log output.
77 fn name(&self) -> &str;
78}
79
80// ---------------------------------------------------------------------------
81// WatchConfig
82// ---------------------------------------------------------------------------
83
84/// Configuration for [`watch_external_with_config`].
85///
86/// Controls the base polling interval and the ceiling applied during
87/// exponential backoff when consecutive poll failures occur.
88///
89/// # Examples
90///
91/// ```
92/// use std::time::Duration;
93/// use koprs_external::watcher::WatchConfig;
94///
95/// // Poll every 30 s; back off up to 10 min on errors.
96/// let config = WatchConfig::new(Duration::from_secs(30))
97/// .with_max_backoff(Duration::from_secs(600));
98/// ```
99#[derive(Debug, Clone)]
100pub struct WatchConfig {
101 /// Base polling interval used after a successful poll.
102 pub interval: Duration,
103 /// Maximum wait during backoff (default: `interval × 32`).
104 pub max_backoff: Duration,
105}
106
107impl WatchConfig {
108 /// Create a new configuration with the given base interval.
109 ///
110 /// `max_backoff` defaults to `interval × 32` (five doublings from the
111 /// base). Override with [`with_max_backoff`][Self::with_max_backoff].
112 pub fn new(interval: Duration) -> Self {
113 Self {
114 max_backoff: interval.saturating_mul(32),
115 interval,
116 }
117 }
118
119 /// Set an explicit upper bound for backoff waits.
120 pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
121 self.max_backoff = max_backoff;
122 self
123 }
124}
125
126// ---------------------------------------------------------------------------
127// Internal helpers
128// ---------------------------------------------------------------------------
129
130fn backoff_wait(base: Duration, consecutive_errors: u32, max: Duration) -> Duration {
131 // 2^min(n, 30) keeps the shift within u32; saturating_mul caps at u64::MAX.
132 let factor = 1u32
133 .checked_shl(consecutive_errors.min(30))
134 .unwrap_or(u32::MAX);
135 base.saturating_mul(factor).min(max)
136}
137
138// ---------------------------------------------------------------------------
139// watch_external_with_config
140// ---------------------------------------------------------------------------
141
142/// Spawn a background task that polls `source` according to `config` and
143/// forwards each [`ExternalEvent`] to `tx`.
144///
145/// **Exponential backoff**: consecutive poll failures increase the retry wait
146/// exponentially — starting at `config.interval`, doubling each time, capped
147/// at `config.max_backoff`. The wait resets to `config.interval` on the next
148/// successful poll.
149///
150/// The task shuts down automatically when all receivers are dropped or when
151/// the returned [`JoinHandle`] is aborted.
152///
153/// # Examples
154///
155/// ```no_run
156/// use std::time::Duration;
157/// use koprs_external::watcher::{WatchConfig, watch_external_with_config, ExternalSource};
158/// use tokio::sync::mpsc;
159///
160/// # async fn example<S: ExternalSource>(source: S) {
161/// let config = WatchConfig::new(Duration::from_secs(30))
162/// .with_max_backoff(Duration::from_secs(300));
163/// let (tx, mut rx) = mpsc::channel(16);
164/// let _handle = watch_external_with_config(source, config, tx);
165///
166/// while let Some(event) = rx.recv().await {
167/// println!("event: {:?}", event);
168/// }
169/// # }
170/// ```
171pub fn watch_external_with_config<S>(
172 source: S,
173 config: WatchConfig,
174 tx: mpsc::Sender<ExternalEvent<S::Item>>,
175) -> JoinHandle<()>
176where
177 S: ExternalSource,
178{
179 tokio::task::spawn(async move {
180 let mut source = source;
181 let name = source.name().to_owned();
182 let mut consecutive_errors: u32 = 0;
183
184 info!(source = %name, "External watcher started");
185
186 loop {
187 match source.poll().await {
188 Ok(events) => {
189 consecutive_errors = 0;
190 for event in events {
191 if tx.send(event).await.is_err() {
192 info!(source = %name, "Receiver dropped; stopping external watcher");
193 return;
194 }
195 }
196 sleep(config.interval).await;
197 }
198 Err(e) => {
199 consecutive_errors = consecutive_errors.saturating_add(1);
200 let wait =
201 backoff_wait(config.interval, consecutive_errors, config.max_backoff);
202 warn!(
203 source = %name,
204 error = %e,
205 attempt = consecutive_errors,
206 ?wait,
207 "Poll failed; retrying after backoff"
208 );
209 sleep(wait).await;
210 }
211 }
212 }
213 })
214}
215
216// ---------------------------------------------------------------------------
217// watch_external (convenience wrapper)
218// ---------------------------------------------------------------------------
219
220/// Spawn a background task that polls `source` every `interval` and forwards
221/// each [`ExternalEvent`] to `tx`.
222///
223/// This is a convenience wrapper around [`watch_external_with_config`] using
224/// [`WatchConfig::new`] with default backoff settings (`max_backoff =
225/// interval × 32`). Call [`watch_external_with_config`] directly to set a
226/// custom ceiling.
227///
228/// **Exponential backoff**: on consecutive poll errors the retry wait doubles
229/// starting from `interval`, capped at `interval × 32`. It resets to
230/// `interval` on the next success.
231///
232/// The task shuts down automatically when all receivers are dropped or when
233/// the returned [`JoinHandle`] is aborted.
234///
235/// # Examples
236///
237/// ```no_run
238/// use std::time::Duration;
239/// use koprs_external::watcher::{watch_external, ExternalSource};
240/// use tokio::sync::mpsc;
241///
242/// # async fn example<S: ExternalSource>(source: S) {
243/// let (tx, mut rx) = mpsc::channel(16);
244/// let _handle = watch_external(source, Duration::from_secs(30), tx);
245///
246/// while let Some(event) = rx.recv().await {
247/// println!("event: {:?}", event);
248/// }
249/// # }
250/// ```
251pub fn watch_external<S>(
252 source: S,
253 interval: Duration,
254 tx: mpsc::Sender<ExternalEvent<S::Item>>,
255) -> JoinHandle<()>
256where
257 S: ExternalSource,
258{
259 watch_external_with_config(source, WatchConfig::new(interval), tx)
260}