zeph-core 0.19.0

Core agent loop, configuration, context builder, metrics, and vault for Zeph
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Instrumented wrappers around tokio channels.
//!
//! When the `profiling` feature is disabled, all wrappers delegate
//! directly to the inner channel with zero overhead — no counters,
//! no timing, no additional allocations.
//!
//! When `profiling` is enabled, each send/receive updates an atomic
//! counter and periodically emits a `tracing::trace!` event with
//! channel-level metrics (sent count, backpressure latency).

#[cfg(feature = "profiling")]
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;

/// Instrumented wrapper around [`mpsc::Sender`].
///
/// Tracks message counts and backpressure when the `profiling` feature
/// is enabled. Every 16th send emits a `tracing::trace!` event with
/// the current queue depth and backpressure latency.
pub struct InstrumentedSender<T> {
    inner: mpsc::Sender<T>,
    name: &'static str,
    #[cfg(feature = "profiling")]
    sent: AtomicU64,
}

impl<T> InstrumentedSender<T> {
    /// Send a value, recording metrics when profiling is active.
    ///
    /// Behaves identically to [`mpsc::Sender::send`]; returns an error
    /// if the receiver has been dropped.
    ///
    /// # Errors
    ///
    /// Returns [`mpsc::error::SendError`] if the channel receiver has been dropped.
    pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
        #[cfg(feature = "profiling")]
        let start = std::time::Instant::now();

        let result = self.inner.send(value).await;

        #[cfg(feature = "profiling")]
        {
            let count = self.sent.fetch_add(1, Ordering::Relaxed) + 1;
            let latency_us = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
            // Sample queue depth every 16th send to minimize overhead.
            if count.trailing_zeros() >= 4 {
                tracing::trace!(
                    channel = self.name,
                    sent = count,
                    queue_depth = self.inner.max_capacity() - self.inner.capacity(),
                    backpressure_latency_us = latency_us,
                    "channel.metrics"
                );
            }
        }

        result
    }

    /// Attempt to send without waiting.
    ///
    /// Returns immediately with an error if the channel is full or the
    /// receiver has been dropped. Does not update the sent counter.
    ///
    /// # Errors
    ///
    /// Returns [`mpsc::error::TrySendError`] if the channel is full or the receiver is gone.
    pub fn try_send(&self, value: T) -> Result<(), mpsc::error::TrySendError<T>> {
        self.inner.try_send(value)
    }

    /// Returns a reference to the inner sender.
    #[must_use]
    pub fn inner(&self) -> &mpsc::Sender<T> {
        &self.inner
    }

    /// Extracts the inner sender, discarding instrumentation.
    ///
    /// Use when passing the sender to code that does not accept the instrumented
    /// wrapper (e.g., a watcher that takes ownership of `mpsc::Sender<T>`).
    #[must_use]
    pub fn into_inner(self) -> mpsc::Sender<T> {
        self.inner
    }
}

impl<T> Clone for InstrumentedSender<T> {
    /// Clone the sender, resetting the `sent` counter to zero.
    ///
    /// Each clone maintains an independent per-clone sent count so that
    /// trace events from concurrent producers are not aggregated. When
    /// reading trace events, the `sent` field represents the number of
    /// messages dispatched by **this clone** since it was created, not
    /// the global channel throughput.
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            name: self.name,
            #[cfg(feature = "profiling")]
            sent: AtomicU64::new(0),
        }
    }
}

/// Instrumented wrapper around [`mpsc::Receiver`].
///
/// Tracks received message count when the `profiling` feature is enabled.
pub struct InstrumentedReceiver<T> {
    inner: mpsc::Receiver<T>,
    #[allow(dead_code)]
    name: &'static str,
    #[cfg(feature = "profiling")]
    received: AtomicU64,
}

impl<T> InstrumentedReceiver<T> {
    /// Wrap an existing [`mpsc::Receiver`] in an instrumented receiver.
    ///
    /// Use this when the receiver is constructed outside the
    /// [`instrumented_channel`] constructor (e.g., from a pre-existing
    /// `WatcherBundle`).
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::sync::mpsc;
    /// use zeph_core::instrumented_channel::InstrumentedReceiver;
    ///
    /// let (_, rx) = mpsc::channel::<u32>(4);
    /// let _instrumented = InstrumentedReceiver::from_receiver(rx, "my-channel");
    /// ```
    #[must_use]
    pub fn from_receiver(inner: mpsc::Receiver<T>, name: &'static str) -> Self {
        Self {
            inner,
            name,
            #[cfg(feature = "profiling")]
            received: AtomicU64::new(0),
        }
    }

    /// Receive a value, recording metrics when profiling is active.
    ///
    /// Returns `None` when all senders have been dropped and the channel is empty.
    pub async fn recv(&mut self) -> Option<T> {
        let result = self.inner.recv().await;
        #[cfg(feature = "profiling")]
        if result.is_some() {
            self.received.fetch_add(1, Ordering::Relaxed);
        }
        result
    }

    /// Returns a mutable reference to the inner receiver.
    pub fn inner_mut(&mut self) -> &mut mpsc::Receiver<T> {
        &mut self.inner
    }

    /// Extracts the inner receiver, discarding instrumentation.
    ///
    /// Use when passing the receiver to code that does not accept
    /// the instrumented wrapper (e.g., agent builder methods).
    #[must_use]
    pub fn into_inner(self) -> mpsc::Receiver<T> {
        self.inner
    }
}

/// Instrumented wrapper around [`mpsc::UnboundedSender`].
///
/// For unbounded channels, only the sent message count is tracked —
/// there is no backpressure or queue depth because unbounded channels
/// have no capacity limit.
pub struct InstrumentedUnboundedSender<T> {
    inner: mpsc::UnboundedSender<T>,
    name: &'static str,
    #[cfg(feature = "profiling")]
    sent: AtomicU64,
}

impl<T> InstrumentedUnboundedSender<T> {
    /// Wrap an existing [`mpsc::UnboundedSender`] in an instrumented sender.
    ///
    /// Use this when the sender is constructed outside the
    /// [`instrumented_unbounded_channel`] constructor (e.g., obtained from
    /// an external channel factory).
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio::sync::mpsc;
    /// use zeph_core::instrumented_channel::InstrumentedUnboundedSender;
    ///
    /// let (tx, _rx) = mpsc::unbounded_channel::<String>();
    /// let _instrumented = InstrumentedUnboundedSender::from_sender(tx, "status");
    /// ```
    #[must_use]
    pub fn from_sender(inner: mpsc::UnboundedSender<T>, name: &'static str) -> Self {
        Self {
            inner,
            name,
            #[cfg(feature = "profiling")]
            sent: AtomicU64::new(0),
        }
    }

    /// Send a value, recording sent count when profiling is active.
    ///
    /// Emits a `tracing::trace!` event every 64th send with the cumulative
    /// sent count. No queue depth or backpressure is tracked for unbounded channels.
    ///
    /// # Errors
    ///
    /// Returns [`mpsc::error::SendError`] if the receiver has been dropped.
    pub fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
        let result = self.inner.send(value);
        #[cfg(feature = "profiling")]
        {
            let count = self.sent.fetch_add(1, Ordering::Relaxed) + 1;
            if count.trailing_zeros() >= 6 {
                tracing::trace!(channel = self.name, sent = count, "channel.metrics");
            }
        }
        result
    }

    /// Returns a reference to the inner sender.
    #[must_use]
    pub fn inner(&self) -> &mpsc::UnboundedSender<T> {
        &self.inner
    }
}

impl<T> Clone for InstrumentedUnboundedSender<T> {
    /// Clone the sender, resetting the `sent` counter to zero.
    ///
    /// Each clone maintains an independent per-clone sent count. See
    /// [`InstrumentedSender`]'s `Clone` impl for the rationale.
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            name: self.name,
            #[cfg(feature = "profiling")]
            sent: AtomicU64::new(0),
        }
    }
}

/// Create an instrumented bounded mpsc channel pair.
///
/// The `name` is used in tracing events to identify the channel.
/// Use a short, static `&'static str` such as `"skill_reload"` or
/// `"config_reload"` — it is embedded in every trace event.
///
/// # Examples
///
/// ```
/// use zeph_core::instrumented_channel::instrumented_channel;
///
/// let (tx, mut rx) = instrumented_channel::<u32>(32, "my-channel");
/// // tx.send(42).await?;
/// // let _ = rx.recv().await;
/// ```
#[must_use]
pub fn instrumented_channel<T>(
    buffer: usize,
    name: &'static str,
) -> (InstrumentedSender<T>, InstrumentedReceiver<T>) {
    let (tx, rx) = mpsc::channel(buffer);
    (
        InstrumentedSender {
            inner: tx,
            name,
            #[cfg(feature = "profiling")]
            sent: AtomicU64::new(0),
        },
        InstrumentedReceiver {
            inner: rx,
            name,
            #[cfg(feature = "profiling")]
            received: AtomicU64::new(0),
        },
    )
}

/// Create an instrumented unbounded mpsc channel pair.
///
/// Returns an [`InstrumentedUnboundedSender`] and a raw [`mpsc::UnboundedReceiver`].
/// The receiver is not wrapped because unbounded channels have no meaningful
/// receive-side metrics beyond what the sender already tracks.
///
/// # Examples
///
/// ```
/// use zeph_core::instrumented_channel::instrumented_unbounded_channel;
///
/// let (tx, mut rx) = instrumented_unbounded_channel::<String>("status");
/// // tx.send("hello".to_string())?;
/// // let _ = rx.recv().await;
/// ```
#[must_use]
pub fn instrumented_unbounded_channel<T>(
    name: &'static str,
) -> (InstrumentedUnboundedSender<T>, mpsc::UnboundedReceiver<T>) {
    let (tx, rx) = mpsc::unbounded_channel();
    (
        InstrumentedUnboundedSender {
            inner: tx,
            name,
            #[cfg(feature = "profiling")]
            sent: AtomicU64::new(0),
        },
        rx,
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Sending through a bounded channel increments the `sent` counter.
    #[cfg(feature = "profiling")]
    #[tokio::test]
    async fn bounded_send_increments_counter() {
        let (tx, mut rx) = instrumented_channel::<u32>(8, "test-bounded");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 0);

        tx.send(1).await.expect("send succeeds");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 1);

        tx.send(2).await.expect("send succeeds");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 2);

        // drain to avoid log noise from dropped channel
        let _ = rx.recv().await;
        let _ = rx.recv().await;
    }

    /// Cloning an `InstrumentedSender` resets the clone's counter to 0.
    #[cfg(feature = "profiling")]
    #[tokio::test]
    async fn clone_resets_counter_to_zero() {
        let (tx, mut rx) = instrumented_channel::<u32>(8, "test-clone");
        tx.send(1).await.expect("send succeeds");
        tx.send(2).await.expect("send succeeds");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 2);

        let tx2 = tx.clone();
        assert_eq!(tx2.sent.load(Ordering::Relaxed), 0, "clone starts at 0");

        // Original counter is unaffected by the clone.
        assert_eq!(tx.sent.load(Ordering::Relaxed), 2);

        let _ = rx.recv().await;
        let _ = rx.recv().await;
    }

    /// Sending through an unbounded channel increments the `sent` counter.
    #[cfg(feature = "profiling")]
    #[test]
    fn unbounded_send_increments_counter() {
        let (tx, _rx) = instrumented_unbounded_channel::<u32>("test-unbounded");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 0);

        tx.send(1).expect("send succeeds");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 1);
    }

    /// Cloning an `InstrumentedUnboundedSender` resets the clone's counter.
    #[cfg(feature = "profiling")]
    #[test]
    fn unbounded_clone_resets_counter() {
        let (tx, _rx) = instrumented_unbounded_channel::<u32>("test-unbounded-clone");
        tx.send(1).expect("send succeeds");
        assert_eq!(tx.sent.load(Ordering::Relaxed), 1);

        let tx2 = tx.clone();
        assert_eq!(tx2.sent.load(Ordering::Relaxed), 0);
    }

    /// `from_receiver` wraps an existing receiver without panicking.
    #[tokio::test]
    async fn from_receiver_wraps_existing() {
        let (tx_raw, rx_raw) = tokio::sync::mpsc::channel::<u32>(4);
        let mut wrapped = InstrumentedReceiver::from_receiver(rx_raw, "wrap-test");
        tx_raw.send(42).await.expect("send succeeds");
        let val = wrapped.recv().await.expect("recv succeeds");
        assert_eq!(val, 42);
    }

    /// `from_sender` wraps an existing unbounded sender without panicking.
    #[test]
    fn from_sender_wraps_existing() {
        let (tx_raw, mut rx) = tokio::sync::mpsc::unbounded_channel::<u32>();
        let wrapped = InstrumentedUnboundedSender::from_sender(tx_raw, "wrap-unbounded");
        wrapped.send(99).expect("send succeeds");
        let val = rx.try_recv().expect("value available");
        assert_eq!(val, 99);
    }
}