rama_net/stream/layer/tracker/
outgoing.rs

1use super::bytes::BytesRWTracker;
2use crate::{
3    client::{ConnectorService, EstablishedClientConnection},
4    stream::Stream,
5};
6use rama_core::{Context, Layer, Service};
7use rama_utils::macros::define_inner_service_accessors;
8use std::fmt;
9
10/// A [`Service`] that wraps a [`Service`]'s output IO [`Stream`] with an atomic R/W tracker.
11///
12/// [`Service`]: rama_core::Service
13/// [`Stream`]: crate::stream::Stream
14pub struct OutgoingBytesTrackerService<S> {
15    inner: S,
16}
17
18impl<S: fmt::Debug> fmt::Debug for OutgoingBytesTrackerService<S> {
19    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20        f.debug_struct("OutgoingBytesTrackerService")
21            .field("inner", &self.inner)
22            .finish()
23    }
24}
25
26impl<S> OutgoingBytesTrackerService<S> {
27    /// Create a new [`OutgoingBytesTrackerService`].
28    ///
29    /// See [`OutgoingBytesTrackerService`] for more information.
30    pub const fn new(inner: S) -> Self {
31        Self { inner }
32    }
33
34    define_inner_service_accessors!();
35}
36
37impl<S> Clone for OutgoingBytesTrackerService<S>
38where
39    S: Clone,
40{
41    fn clone(&self) -> Self {
42        Self {
43            inner: self.inner.clone(),
44        }
45    }
46}
47
48impl<S, State, Request> Service<State, Request> for OutgoingBytesTrackerService<S>
49where
50    S: ConnectorService<State, Request, Connection: Stream + Unpin, Error: Send + 'static>,
51    State: Clone + Send + Sync + 'static,
52    Request: Send + 'static,
53{
54    type Response = EstablishedClientConnection<BytesRWTracker<S::Connection>, State, Request>;
55    type Error = S::Error;
56
57    async fn serve(
58        &self,
59        ctx: Context<State>,
60        req: Request,
61    ) -> Result<Self::Response, Self::Error> {
62        let EstablishedClientConnection { mut ctx, req, conn } =
63            self.inner.connect(ctx, req).await?;
64        let conn = BytesRWTracker::new(conn);
65        let handle = conn.handle();
66        ctx.insert(handle);
67        Ok(EstablishedClientConnection { ctx, req, conn })
68    }
69}
70
71/// A [`Layer`] that wraps a [`Service`]'s output IO [`Stream`] with an atomic R/W tracker.
72///
73/// [`Layer`]: rama_core::Layer
74/// [`Service`]: rama_core::Service
75/// [`Stream`]: crate::stream::Stream
76#[derive(Debug, Clone)]
77#[non_exhaustive]
78pub struct OutgoingBytesTrackerLayer;
79
80impl OutgoingBytesTrackerLayer {
81    /// Create a new [`OutgoingBytesTrackerLayer`].
82    pub const fn new() -> Self {
83        Self
84    }
85}
86
87impl Default for OutgoingBytesTrackerLayer {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl<S> Layer<S> for OutgoingBytesTrackerLayer {
94    type Service = OutgoingBytesTrackerService<S>;
95
96    fn layer(&self, inner: S) -> Self::Service {
97        OutgoingBytesTrackerService { inner }
98    }
99}