richat_shared/transports/
mod.rs

1pub mod grpc;
2pub mod quic;
3pub mod tcp;
4
5use {
6    futures::stream::BoxStream,
7    richat_proto::richat::RichatFilter,
8    solana_sdk::clock::Slot,
9    std::{
10        future::Future,
11        io::{self, IoSlice},
12        pin::Pin,
13        sync::Arc,
14        task::{ready, Context, Poll},
15    },
16    thiserror::Error,
17    tokio::io::AsyncWrite,
18};
19
20pub type RecvItem = Arc<Vec<u8>>;
21
22pub type RecvStream = BoxStream<'static, Result<RecvItem, RecvError>>;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
25pub enum RecvError {
26    #[error("channel lagged")]
27    Lagged,
28    #[error("channel closed")]
29    Closed,
30}
31
32#[derive(Debug, Error)]
33pub enum SubscribeError {
34    #[error("channel is not initialized yet")]
35    NotInitialized,
36    #[error("only available from slot {first_available}")]
37    SlotNotAvailable { first_available: Slot },
38}
39
40pub trait Subscribe {
41    fn subscribe(
42        &self,
43        replay_from_slot: Option<Slot>,
44        filter: Option<RichatFilter>,
45    ) -> Result<RecvStream, SubscribeError>;
46}
47
48#[derive(Debug)]
49pub struct WriteVectored<'a, W: ?Sized> {
50    writer: &'a mut W,
51    buffers: &'a mut [IoSlice<'a>],
52    offset: usize,
53}
54
55impl<'a, W> WriteVectored<'a, W> {
56    pub fn new(writer: &'a mut W, buffers: &'a mut [IoSlice<'a>]) -> Self {
57        Self {
58            writer,
59            buffers,
60            offset: 0,
61        }
62    }
63}
64
65impl<W> Future for WriteVectored<'_, W>
66where
67    W: AsyncWrite + Unpin + ?Sized,
68{
69    type Output = io::Result<()>;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
72        let me = unsafe { self.get_unchecked_mut() };
73        while me.offset < me.buffers.len() {
74            let bufs = &me.buffers[me.offset..];
75            let mut n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, bufs))?;
76            if n == 0 {
77                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
78            }
79
80            while n > 0 {
81                if n >= me.buffers[me.offset].len() {
82                    n -= me.buffers[me.offset].len();
83                    me.offset += 1;
84                    continue;
85                }
86
87                me.buffers[me.offset].advance(n);
88                n = 0;
89            }
90        }
91        Poll::Ready(Ok(()))
92    }
93}