rama_net/stream/layer/tracker/
incoming.rs

1use super::bytes::BytesRWTracker;
2use crate::stream::Stream;
3use rama_core::{Context, Layer, Service};
4use rama_utils::macros::define_inner_service_accessors;
5use std::fmt;
6
7/// A [`Service`] that wraps a [`Service`]'s input IO [`Stream`] with an atomic R/W tracker.
8///
9/// [`Service`]: rama_core::Service
10/// [`Stream`]: crate::stream::Stream
11pub struct IncomingBytesTrackerService<S> {
12    inner: S,
13}
14
15impl<S: fmt::Debug> fmt::Debug for IncomingBytesTrackerService<S> {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        f.debug_struct("IncomingBytesTrackerService")
18            .field("inner", &self.inner)
19            .finish()
20    }
21}
22
23impl<S> IncomingBytesTrackerService<S> {
24    /// Create a new [`IncomingBytesTrackerService`].
25    ///
26    /// See [`IncomingBytesTrackerService`] for more information.
27    pub const fn new(inner: S) -> Self {
28        Self { inner }
29    }
30
31    define_inner_service_accessors!();
32}
33
34impl<S> Clone for IncomingBytesTrackerService<S>
35where
36    S: Clone,
37{
38    fn clone(&self) -> Self {
39        Self {
40            inner: self.inner.clone(),
41        }
42    }
43}
44
45impl<State, S, IO> Service<State, IO> for IncomingBytesTrackerService<S>
46where
47    State: Clone + Send + Sync + 'static,
48    S: Service<State, BytesRWTracker<IO>>,
49    IO: Stream,
50{
51    type Response = S::Response;
52    type Error = S::Error;
53
54    fn serve(
55        &self,
56        mut ctx: Context<State>,
57        stream: IO,
58    ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
59        let tracked_stream = BytesRWTracker::new(stream);
60        let handle = tracked_stream.handle();
61        ctx.insert(handle);
62        self.inner.serve(ctx, tracked_stream)
63    }
64}
65
66/// A [`Layer`] that wraps a [`Service`]'s input IO [`Stream`] with an atomic R/W tracker.
67///
68/// [`Layer`]: rama_core::Layer
69/// [`Service`]: rama_core::Service
70/// [`Stream`]: crate::stream::Stream
71#[derive(Debug, Clone)]
72#[non_exhaustive]
73pub struct IncomingBytesTrackerLayer;
74
75impl IncomingBytesTrackerLayer {
76    /// Create a new [`IncomingBytesTrackerLayer`].
77    pub const fn new() -> Self {
78        Self
79    }
80}
81
82impl Default for IncomingBytesTrackerLayer {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl<S> Layer<S> for IncomingBytesTrackerLayer {
89    type Service = IncomingBytesTrackerService<S>;
90
91    fn layer(&self, inner: S) -> Self::Service {
92        IncomingBytesTrackerService { inner }
93    }
94}