rama_net/stream/layer/tracker/
outgoing.rs1use super::bytes::BytesRWTracker;
2use crate::{
3 client::{ConnectorService, EstablishedClientConnection},
4 stream::Stream,
5};
6use rama_core::{Context, Layer, Service};
7use rama_utils::macros::define_inner_service_accessors;
8use std::fmt;
9
10pub struct OutgoingBytesTrackerService<S> {
15 inner: S,
16}
17
18impl<S: fmt::Debug> fmt::Debug for OutgoingBytesTrackerService<S> {
19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20 f.debug_struct("OutgoingBytesTrackerService")
21 .field("inner", &self.inner)
22 .finish()
23 }
24}
25
26impl<S> OutgoingBytesTrackerService<S> {
27 pub const fn new(inner: S) -> Self {
31 Self { inner }
32 }
33
34 define_inner_service_accessors!();
35}
36
37impl<S> Clone for OutgoingBytesTrackerService<S>
38where
39 S: Clone,
40{
41 fn clone(&self) -> Self {
42 Self {
43 inner: self.inner.clone(),
44 }
45 }
46}
47
48impl<S, State, Request> Service<State, Request> for OutgoingBytesTrackerService<S>
49where
50 S: ConnectorService<State, Request, Connection: Stream + Unpin, Error: Send + 'static>,
51 State: Clone + Send + Sync + 'static,
52 Request: Send + 'static,
53{
54 type Response = EstablishedClientConnection<BytesRWTracker<S::Connection>, State, Request>;
55 type Error = S::Error;
56
57 async fn serve(
58 &self,
59 ctx: Context<State>,
60 req: Request,
61 ) -> Result<Self::Response, Self::Error> {
62 let EstablishedClientConnection { mut ctx, req, conn } =
63 self.inner.connect(ctx, req).await?;
64 let conn = BytesRWTracker::new(conn);
65 let handle = conn.handle();
66 ctx.insert(handle);
67 Ok(EstablishedClientConnection { ctx, req, conn })
68 }
69}
70
71#[derive(Debug, Clone)]
77#[non_exhaustive]
78pub struct OutgoingBytesTrackerLayer;
79
80impl OutgoingBytesTrackerLayer {
81 pub const fn new() -> Self {
83 Self
84 }
85}
86
87impl Default for OutgoingBytesTrackerLayer {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl<S> Layer<S> for OutgoingBytesTrackerLayer {
94 type Service = OutgoingBytesTrackerService<S>;
95
96 fn layer(&self, inner: S) -> Self::Service {
97 OutgoingBytesTrackerService { inner }
98 }
99}