cat_dev/fsemul/pcfs/sata/server/wal/
layer.rs1use crate::{
4 errors::CatBridgeError,
5 fsemul::pcfs::sata::server::wal::WriteAheadLog,
6 net::{
7 models::{Request, Response},
8 server::models::ResponseStreamEvent,
9 },
10};
11use std::{
12 convert::Infallible,
13 pin::Pin,
14 task::{Context, Poll},
15};
16use tower::{Layer, Service};
17
18#[derive(Clone, Debug)]
21pub struct WALBeginStreamLayer(pub WriteAheadLog);
22
23impl<Layered> Layer<Layered> for WALBeginStreamLayer
24where
25 Layered: Clone,
26{
27 type Service = LayeredBeginWALStream<Layered>;
28
29 fn layer(&self, inner: Layered) -> Self::Service {
30 LayeredBeginWALStream {
31 inner,
32 log: self.0.clone(),
33 }
34 }
35}
36
37#[derive(Clone)]
38pub struct LayeredBeginWALStream<Layered> {
39 inner: Layered,
40 log: WriteAheadLog,
41}
42
43impl<Layered, State: Clone + Send + Sync + 'static> Service<ResponseStreamEvent<State>>
44 for LayeredBeginWALStream<Layered>
45where
46 Layered: Service<ResponseStreamEvent<State>, Response = bool, Error = CatBridgeError>
47 + Clone
48 + Send
49 + 'static,
50 Layered::Future: Send + 'static,
51{
52 type Response = Layered::Response;
53 type Error = Layered::Error;
54 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
55
56 #[inline]
57 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58 self.inner.poll_ready(ctx)
59 }
60
61 fn call(&mut self, evt: ResponseStreamEvent<State>) -> Self::Future {
62 let log = self.log.clone();
63 let mut inner = self.inner.clone();
64
65 Box::pin(async move {
66 log.record_open_stream(evt.stream_id()).await;
67 inner.call(evt).await
68 })
69 }
70}
71
72#[derive(Clone, Debug)]
75pub struct WALEndStreamLayer(pub WriteAheadLog);
76
77impl<Layered> Layer<Layered> for WALEndStreamLayer
78where
79 Layered: Clone,
80{
81 type Service = LayeredEndWALStream<Layered>;
82
83 fn layer(&self, inner: Layered) -> Self::Service {
84 LayeredEndWALStream {
85 inner,
86 log: self.0.clone(),
87 }
88 }
89}
90
91#[derive(Clone)]
92pub struct LayeredEndWALStream<Layered> {
93 inner: Layered,
94 log: WriteAheadLog,
95}
96
97impl<Layered, State: Clone + Send + Sync + 'static> Service<ResponseStreamEvent<State>>
98 for LayeredEndWALStream<Layered>
99where
100 Layered: Service<ResponseStreamEvent<State>, Response = (), Error = CatBridgeError>
101 + Clone
102 + Send
103 + 'static,
104 Layered::Future: Send + 'static,
105{
106 type Response = Layered::Response;
107 type Error = Layered::Error;
108 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
109
110 #[inline]
111 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
112 self.inner.poll_ready(ctx)
113 }
114
115 fn call(&mut self, evt: ResponseStreamEvent<State>) -> Self::Future {
116 let log = self.log.clone();
117 let mut inner = self.inner.clone();
118
119 Box::pin(async move {
120 log.record_close_stream(evt.stream_id()).await;
121 inner.call(evt).await
122 })
123 }
124}
125
126#[derive(Clone, Debug)]
129pub struct WALMessageLayer(pub WriteAheadLog);
130
131impl<Layered> Layer<Layered> for WALMessageLayer
132where
133 Layered: Clone,
134{
135 type Service = LayeredWALMessage<Layered>;
136
137 fn layer(&self, inner: Layered) -> Self::Service {
138 LayeredWALMessage {
139 inner,
140 log: self.0.clone(),
141 }
142 }
143}
144
145#[derive(Clone)]
146pub struct LayeredWALMessage<Layered> {
147 inner: Layered,
148 log: WriteAheadLog,
149}
150
151impl<Layered, State: Clone + Send + Sync + 'static> Service<Request<State>>
152 for LayeredWALMessage<Layered>
153where
154 Layered:
155 Service<Request<State>, Response = Response, Error = Infallible> + Clone + Send + 'static,
156 Layered::Future: Send + 'static,
157{
158 type Response = Layered::Response;
159 type Error = Layered::Error;
160 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
161
162 #[inline]
163 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
164 self.inner.poll_ready(ctx)
165 }
166
167 fn call(&mut self, mut req: Request<State>) -> Self::Future {
168 let log = self.log.clone();
169 let mut inner = self.inner.clone();
170
171 Box::pin(async move {
172 let sid = req.stream_id();
173 log.record_request(sid, req.body().clone()).await;
174 req.extensions_mut().insert(log.clone());
175 match inner.call(req).await {
176 Ok(resp) => {
177 if let Some(bod) = resp.body() {
178 log.record_response(sid, bod.clone()).await;
179 }
180 Ok::<Layered::Response, Layered::Error>(resp)
181 }
182 Err(cause) => Err::<Layered::Response, Layered::Error>(cause),
183 }
184 })
185 }
186}