1use std::pin::pin;
2
3use tpex::Action;
4
5use super::tokens;
6
7
8struct CachedFileView<Stream: tokio::io::AsyncWrite> {
9 base: Stream,
10 cache: Vec<u8>
11}
12impl<Stream: tokio::io::AsyncWrite> CachedFileView<Stream> {
13 fn new(base: Stream) -> Self {
14 CachedFileView { base, cache: Vec::new() }
15 }
16 fn extract(self) -> Vec<u8> {
17 self.cache
18 }
19}
20impl<Stream: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for CachedFileView<Stream> {
21 fn poll_write(
22 mut self: std::pin::Pin<&mut Self>,
23 cx: &mut std::task::Context<'_>,
24 buf: &[u8],
25 ) -> std::task::Poll<Result<usize, std::io::Error>> {
26 let ret = pin!(&mut self.base).poll_write(cx, buf);
27 if let std::task::Poll::Ready(Ok(len)) = ret {
28 self.cache.extend_from_slice(&buf[..len]);
29 }
30 ret
31 }
32
33 fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
34 pin!(&mut self.base).poll_flush(cx)
35 }
36
37 fn poll_shutdown(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
38 pin!(&mut self.base).poll_shutdown(cx)
39 }
40}
41
42pub(crate) struct TPExState<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> {
43 state: tpex::State,
44 file: Stream,
45 cache: Vec<String>
46}
47impl<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> TPExState<Stream> {
48 pub fn new(state: tpex::State, file: Stream, cache: Vec<String>) -> Self {
49 Self { state, file, cache }
50 }
51
52 pub async fn apply(&mut self, action: Action) -> Result<u64, tpex::Error> {
53 let mut stream = CachedFileView::new(&mut self.file);
54 let ret = self.state.apply(action, &mut stream).await?;
55 self.cache.push(String::from_utf8(stream.extract()).expect("Produced non-utf8 log line"));
56 Ok(ret)
57 }
58
59 pub(crate) fn cache(&self) -> &[String] {
60 &self.cache
61 }
62
63 pub(crate) fn state(&self) -> &tpex::State {
64 &self.state
65 }
66 }
75
76pub(crate) struct StateStruct<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> {
77 pub(crate) tpex: tokio::sync::RwLock<TPExState<Stream>>,
78 pub(crate) tokens: tokens::TokenHandler,
79 pub(crate) updated: tokio::sync::watch::Sender<u64>,
80}
81#[macro_export]
82macro_rules! state_type {
83 () => {
84 std::sync::Arc<$crate::server::state::StateStruct<impl AsyncBufRead + AsyncWrite + AsyncSeek + Unpin + Send + Sync + 'static>>
85 };
86}