nitinol_protocol/io/
read.rs1use std::collections::BTreeSet;
2use std::fmt::Debug;
3use std::sync::Arc;
4use async_trait::async_trait;
5use nitinol_core::event::Event;
6use nitinol_core::identifier::{EntityId, ToEntityId};
7use crate::errors::ProtocolError;
8use crate::Payload;
9
10#[async_trait]
11pub trait Reader: 'static + Sync + Send {
12 async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError>;
13 async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError>;
14 async fn read_to_latest(&self, id: EntityId, from: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
15 self.read_to(id, from, i64::MAX).await
16 }
17}
18
19
20pub struct ReadProtocol {
21 reader: Arc<dyn Reader>,
22}
23
24impl Debug for ReadProtocol {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("ReadProtocol").finish()
27 }
28}
29
30impl Clone for ReadProtocol {
31 fn clone(&self) -> Self {
32 Self {
33 reader: Arc::clone(&self.reader),
34 }
35 }
36}
37
38impl ReadProtocol {
39 pub fn new(provider: impl Reader) -> Self {
40 Self {
41 reader: Arc::new(provider),
42 }
43 }
44
45 pub async fn read<E: Event>(&self, id: impl ToEntityId, seq: i64) -> Result<E, ProtocolError> {
46 let payload = self.reader.read(id.to_entity_id(), seq).await?;
47 E::from_bytes(&payload.bytes)
48 .map_err(|e| ProtocolError::Read(Box::new(e)))
49 }
50
51 pub async fn read_to(&self, id: impl ToEntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
52 self.reader.read_to(id.to_entity_id(), from, to).await
53 }
54
55 pub async fn read_to_latest(&self, id: impl ToEntityId, from: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
56 self.reader.read_to_latest(id.to_entity_id(), from).await
57 }
58}