tpex_api/server/
state.rs

1use std::pin::pin;
2
3use tpex::Action;
4
5use super::tokens;
6
7
8struct CachedFileView<Stream: tokio::io::AsyncWrite> {
9    base: Stream,
10    cache: Vec<u8>
11}
12impl<Stream: tokio::io::AsyncWrite> CachedFileView<Stream> {
13    fn new(base: Stream) -> Self {
14        CachedFileView { base, cache: Vec::new() }
15    }
16    fn extract(self) -> Vec<u8> {
17        self.cache
18    }
19}
20impl<Stream: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for CachedFileView<Stream> {
21    fn poll_write(
22        mut self: std::pin::Pin<&mut Self>,
23        cx: &mut std::task::Context<'_>,
24        buf: &[u8],
25    ) -> std::task::Poll<Result<usize, std::io::Error>> {
26        let ret = pin!(&mut self.base).poll_write(cx, buf);
27        if let std::task::Poll::Ready(Ok(len)) = ret {
28            self.cache.extend_from_slice(&buf[..len]);
29        }
30        ret
31    }
32
33    fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
34        pin!(&mut self.base).poll_flush(cx)
35    }
36
37    fn poll_shutdown(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), std::io::Error>> {
38        pin!(&mut self.base).poll_shutdown(cx)
39    }
40}
41
42pub(crate) struct TPExState<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> {
43    state: tpex::State,
44    file: Stream,
45    cache: Vec<String>
46}
47impl<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> TPExState<Stream> {
48    pub fn new(state: tpex::State, file: Stream, cache: Vec<String>) -> Self {
49        Self { state, file, cache }
50    }
51
52    pub async fn apply(&mut self, action: Action) -> Result<u64, tpex::Error> {
53        let mut stream = CachedFileView::new(&mut self.file);
54        let ret = self.state.apply(action, &mut stream).await?;
55        self.cache.push(String::from_utf8(stream.extract()).expect("Produced non-utf8 log line"));
56        Ok(ret)
57    }
58
59    pub(crate) fn cache(&self) -> &[String] {
60        &self.cache
61    }
62
63    pub(crate) fn state(&self) -> &tpex::State {
64        &self.state
65    }
66    // async fn get_lines(&mut self) -> Vec<u8> {
67    //     // Keeping everything in the log file means we can't have different versions of the same data
68    //     self.file.rewind().await.expect("Could not rewind trade file.");
69    //     let mut buf = Vec::new();
70    //     // This will seek to the end again, so pos is the same before and after get_lines
71    //     self.file.read_to_end(&mut buf).await.expect("Could not re-read trade file.");
72    //     buf
73    // }
74}
75
76pub(crate) struct StateStruct<Stream: tokio::io::AsyncSeek + tokio::io::AsyncWrite + tokio::io::AsyncRead + Unpin> {
77    pub(crate) tpex: tokio::sync::RwLock<TPExState<Stream>>,
78    pub(crate) tokens: tokens::TokenHandler,
79    pub(crate) updated: tokio::sync::watch::Sender<u64>,
80}
81#[macro_export]
82macro_rules! state_type {
83    () => {
84        std::sync::Arc<$crate::server::state::StateStruct<impl AsyncBufRead + AsyncWrite + AsyncSeek + Unpin + Send + Sync + 'static>>
85    };
86}