rama_net/stream/layer/tracker/
incoming.rs1use super::bytes::BytesRWTracker;
2use crate::stream::Stream;
3use rama_core::{Context, Layer, Service};
4use rama_utils::macros::define_inner_service_accessors;
5use std::fmt;
6
7pub struct IncomingBytesTrackerService<S> {
12 inner: S,
13}
14
15impl<S: fmt::Debug> fmt::Debug for IncomingBytesTrackerService<S> {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 f.debug_struct("IncomingBytesTrackerService")
18 .field("inner", &self.inner)
19 .finish()
20 }
21}
22
23impl<S> IncomingBytesTrackerService<S> {
24 pub const fn new(inner: S) -> Self {
28 Self { inner }
29 }
30
31 define_inner_service_accessors!();
32}
33
34impl<S> Clone for IncomingBytesTrackerService<S>
35where
36 S: Clone,
37{
38 fn clone(&self) -> Self {
39 Self {
40 inner: self.inner.clone(),
41 }
42 }
43}
44
45impl<State, S, IO> Service<State, IO> for IncomingBytesTrackerService<S>
46where
47 State: Clone + Send + Sync + 'static,
48 S: Service<State, BytesRWTracker<IO>>,
49 IO: Stream,
50{
51 type Response = S::Response;
52 type Error = S::Error;
53
54 fn serve(
55 &self,
56 mut ctx: Context<State>,
57 stream: IO,
58 ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + '_ {
59 let tracked_stream = BytesRWTracker::new(stream);
60 let handle = tracked_stream.handle();
61 ctx.insert(handle);
62 self.inner.serve(ctx, tracked_stream)
63 }
64}
65
66#[derive(Debug, Clone)]
72#[non_exhaustive]
73pub struct IncomingBytesTrackerLayer;
74
75impl IncomingBytesTrackerLayer {
76 pub const fn new() -> Self {
78 Self
79 }
80}
81
82impl Default for IncomingBytesTrackerLayer {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl<S> Layer<S> for IncomingBytesTrackerLayer {
89 type Service = IncomingBytesTrackerService<S>;
90
91 fn layer(&self, inner: S) -> Self::Service {
92 IncomingBytesTrackerService { inner }
93 }
94}