richat_shared/transports/
mod.rs

1pub mod grpc;
2pub mod quic;
3
4use {
5    futures::stream::BoxStream,
6    richat_proto::richat::RichatFilter,
7    solana_sdk::clock::Slot,
8    std::{
9        future::Future,
10        io::{self, IoSlice},
11        pin::Pin,
12        sync::Arc,
13        task::{Context, Poll, ready},
14    },
15    thiserror::Error,
16    tokio::io::AsyncWrite,
17};
18
19pub type RecvItem = Arc<Vec<u8>>;
20
21pub type RecvStream = BoxStream<'static, Result<RecvItem, RecvError>>;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
24pub enum RecvError {
25    #[error("channel lagged")]
26    Lagged,
27    #[error("channel closed")]
28    Closed,
29}
30
31#[derive(Debug, Error)]
32pub enum SubscribeError {
33    #[error("channel is not initialized yet")]
34    NotInitialized,
35    #[error("only available from slot {first_available}")]
36    SlotNotAvailable { first_available: Slot },
37}
38
39pub trait Subscribe {
40    fn subscribe(
41        &self,
42        replay_from_slot: Option<Slot>,
43        filter: Option<RichatFilter>,
44    ) -> Result<RecvStream, SubscribeError>;
45}
46
47#[derive(Debug)]
48pub struct WriteVectored<'a, W: ?Sized> {
49    writer: &'a mut W,
50    buffers: &'a mut [IoSlice<'a>],
51    offset: usize,
52}
53
54impl<'a, W> WriteVectored<'a, W> {
55    pub const fn new(writer: &'a mut W, buffers: &'a mut [IoSlice<'a>]) -> Self {
56        Self {
57            writer,
58            buffers,
59            offset: 0,
60        }
61    }
62}
63
64impl<W> Future for WriteVectored<'_, W>
65where
66    W: AsyncWrite + Unpin + ?Sized,
67{
68    type Output = io::Result<()>;
69
70    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
71        let me = unsafe { self.get_unchecked_mut() };
72        while me.offset < me.buffers.len() {
73            let bufs = &me.buffers[me.offset..];
74            let mut n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, bufs))?;
75            if n == 0 {
76                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
77            }
78
79            while n > 0 {
80                if n >= me.buffers[me.offset].len() {
81                    n -= me.buffers[me.offset].len();
82                    me.offset += 1;
83                    continue;
84                }
85
86                me.buffers[me.offset].advance(n);
87                n = 0;
88            }
89        }
90        Poll::Ready(Ok(()))
91    }
92}