cat_dev/fsemul/pcfs/sata/server/wal/
layer.rs

1//! Allow attaching the Write-Ahead log as an arbitrary layer to any server.
2
3use crate::{
4	errors::CatBridgeError,
5	fsemul::pcfs::sata::server::wal::WriteAheadLog,
6	net::{
7		models::{Request, Response},
8		server::models::ResponseStreamEvent,
9	},
10};
11use std::{
12	convert::Infallible,
13	pin::Pin,
14	task::{Context, Poll},
15};
16use tower::{Layer, Service};
17
18/// A layer that will automatically record begin streams from a
19/// TCP Server.
20#[derive(Clone, Debug)]
21pub struct WALBeginStreamLayer(pub WriteAheadLog);
22
23impl<Layered> Layer<Layered> for WALBeginStreamLayer
24where
25	Layered: Clone,
26{
27	type Service = LayeredBeginWALStream<Layered>;
28
29	fn layer(&self, inner: Layered) -> Self::Service {
30		LayeredBeginWALStream {
31			inner,
32			log: self.0.clone(),
33		}
34	}
35}
36
37#[derive(Clone)]
38pub struct LayeredBeginWALStream<Layered> {
39	inner: Layered,
40	log: WriteAheadLog,
41}
42
43impl<Layered, State: Clone + Send + Sync + 'static> Service<ResponseStreamEvent<State>>
44	for LayeredBeginWALStream<Layered>
45where
46	Layered: Service<ResponseStreamEvent<State>, Response = bool, Error = CatBridgeError>
47		+ Clone
48		+ Send
49		+ 'static,
50	Layered::Future: Send + 'static,
51{
52	type Response = Layered::Response;
53	type Error = Layered::Error;
54	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
55
56	#[inline]
57	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58		self.inner.poll_ready(ctx)
59	}
60
61	fn call(&mut self, evt: ResponseStreamEvent<State>) -> Self::Future {
62		let log = self.log.clone();
63		let mut inner = self.inner.clone();
64
65		Box::pin(async move {
66			log.record_open_stream(evt.stream_id()).await;
67			inner.call(evt).await
68		})
69	}
70}
71
72/// A layer that will automatically record end streams from a
73/// TCP Server.
74#[derive(Clone, Debug)]
75pub struct WALEndStreamLayer(pub WriteAheadLog);
76
77impl<Layered> Layer<Layered> for WALEndStreamLayer
78where
79	Layered: Clone,
80{
81	type Service = LayeredEndWALStream<Layered>;
82
83	fn layer(&self, inner: Layered) -> Self::Service {
84		LayeredEndWALStream {
85			inner,
86			log: self.0.clone(),
87		}
88	}
89}
90
91#[derive(Clone)]
92pub struct LayeredEndWALStream<Layered> {
93	inner: Layered,
94	log: WriteAheadLog,
95}
96
97impl<Layered, State: Clone + Send + Sync + 'static> Service<ResponseStreamEvent<State>>
98	for LayeredEndWALStream<Layered>
99where
100	Layered: Service<ResponseStreamEvent<State>, Response = (), Error = CatBridgeError>
101		+ Clone
102		+ Send
103		+ 'static,
104	Layered::Future: Send + 'static,
105{
106	type Response = Layered::Response;
107	type Error = Layered::Error;
108	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
109
110	#[inline]
111	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
112		self.inner.poll_ready(ctx)
113	}
114
115	fn call(&mut self, evt: ResponseStreamEvent<State>) -> Self::Future {
116		let log = self.log.clone();
117		let mut inner = self.inner.clone();
118
119		Box::pin(async move {
120			log.record_close_stream(evt.stream_id()).await;
121			inner.call(evt).await
122		})
123	}
124}
125
126/// A layer that will automatically record requests/responses, and attach a WAL
127/// as an extension.
128#[derive(Clone, Debug)]
129pub struct WALMessageLayer(pub WriteAheadLog);
130
131impl<Layered> Layer<Layered> for WALMessageLayer
132where
133	Layered: Clone,
134{
135	type Service = LayeredWALMessage<Layered>;
136
137	fn layer(&self, inner: Layered) -> Self::Service {
138		LayeredWALMessage {
139			inner,
140			log: self.0.clone(),
141		}
142	}
143}
144
145#[derive(Clone)]
146pub struct LayeredWALMessage<Layered> {
147	inner: Layered,
148	log: WriteAheadLog,
149}
150
151impl<Layered, State: Clone + Send + Sync + 'static> Service<Request<State>>
152	for LayeredWALMessage<Layered>
153where
154	Layered:
155		Service<Request<State>, Response = Response, Error = Infallible> + Clone + Send + 'static,
156	Layered::Future: Send + 'static,
157{
158	type Response = Layered::Response;
159	type Error = Layered::Error;
160	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
161
162	#[inline]
163	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
164		self.inner.poll_ready(ctx)
165	}
166
167	fn call(&mut self, mut req: Request<State>) -> Self::Future {
168		let log = self.log.clone();
169		let mut inner = self.inner.clone();
170
171		Box::pin(async move {
172			let sid = req.stream_id();
173			log.record_request(sid, req.body().clone()).await;
174			req.extensions_mut().insert(log.clone());
175			match inner.call(req).await {
176				Ok(resp) => {
177					if let Some(bod) = resp.body() {
178						log.record_response(sid, bod.clone()).await;
179					}
180					Ok::<Layered::Response, Layered::Error>(resp)
181				}
182				Err(cause) => Err::<Layered::Response, Layered::Error>(cause),
183			}
184		})
185	}
186}