nitinol_protocol/io/
read.rs

1use std::collections::BTreeSet;
2use std::fmt::Debug;
3use std::sync::Arc;
4use async_trait::async_trait;
5use nitinol_core::event::Event;
6use nitinol_core::identifier::{EntityId, ToEntityId};
7use crate::errors::ProtocolError;
8use crate::Payload;
9
10#[async_trait]
11pub trait Reader: 'static + Sync + Send {
12    async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError>;
13    async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError>;
14    async fn read_to_latest(&self, id: EntityId, from: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
15        self.read_to(id, from, i64::MAX).await
16    }
17}
18
19
20pub struct ReadProtocol {
21    reader: Arc<dyn Reader>,
22}
23
24impl Debug for ReadProtocol {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("ReadProtocol").finish()
27    }
28}
29
30impl Clone for ReadProtocol {
31    fn clone(&self) -> Self {
32        Self {
33            reader: Arc::clone(&self.reader),
34        }
35    }
36}
37
38impl ReadProtocol {
39    pub fn new(provider: impl Reader) -> Self {
40        Self {
41            reader: Arc::new(provider),
42        }
43    }
44    
45    pub async fn read<E: Event>(&self, id: impl ToEntityId, seq: i64) -> Result<E, ProtocolError> {
46        let payload = self.reader.read(id.to_entity_id(), seq).await?;
47        E::from_bytes(&payload.bytes)
48            .map_err(|e| ProtocolError::Read(Box::new(e)))
49    }
50    
51    pub async fn read_to(&self, id: impl ToEntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
52        self.reader.read_to(id.to_entity_id(), from, to).await
53    }
54    
55    pub async fn read_to_latest(&self, id: impl ToEntityId, from: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
56        self.reader.read_to_latest(id.to_entity_id(), from).await
57    }
58}