richat_shared/transports/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
pub mod grpc;
pub mod quic;
pub mod tcp;

use {
    futures::stream::BoxStream,
    richat_proto::richat::RichatFilter,
    solana_sdk::clock::Slot,
    std::{
        future::Future,
        io::{self, IoSlice},
        pin::Pin,
        sync::Arc,
        task::{ready, Context, Poll},
    },
    thiserror::Error,
    tokio::io::AsyncWrite,
};

pub type RecvItem = Arc<Vec<u8>>;

pub type RecvStream = BoxStream<'static, Result<RecvItem, RecvError>>;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
pub enum RecvError {
    #[error("channel lagged")]
    Lagged,
    #[error("channel closed")]
    Closed,
}

#[derive(Debug, Error)]
pub enum SubscribeError {
    #[error("channel is not initialized yet")]
    NotInitialized,
    #[error("only available from slot {first_available}")]
    SlotNotAvailable { first_available: Slot },
}

pub trait Subscribe {
    fn subscribe(
        &self,
        replay_from_slot: Option<Slot>,
        filter: Option<RichatFilter>,
    ) -> Result<RecvStream, SubscribeError>;
}

#[derive(Debug)]
pub struct WriteVectored<'a, W: ?Sized> {
    writer: &'a mut W,
    buffers: &'a mut [IoSlice<'a>],
    offset: usize,
}

impl<'a, W> WriteVectored<'a, W> {
    pub fn new(writer: &'a mut W, buffers: &'a mut [IoSlice<'a>]) -> Self {
        Self {
            writer,
            buffers,
            offset: 0,
        }
    }
}

impl<'a, W> Future for WriteVectored<'a, W>
where
    W: AsyncWrite + Unpin + ?Sized,
{
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let me = unsafe { self.get_unchecked_mut() };
        while me.offset < me.buffers.len() {
            let bufs = &me.buffers[me.offset..];
            let mut n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, bufs))?;
            if n == 0 {
                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
            }

            while n > 0 {
                if n >= me.buffers[me.offset].len() {
                    n -= me.buffers[me.offset].len();
                    me.offset += 1;
                    continue;
                }

                me.buffers[me.offset].advance(n);
                n = 0;
            }
        }
        Poll::Ready(Ok(()))
    }
}