richat_shared/transports/
mod.rs1pub mod grpc;
2pub mod quic;
3pub mod tcp;
4
5use {
6 futures::stream::BoxStream,
7 richat_proto::richat::RichatFilter,
8 solana_sdk::clock::Slot,
9 std::{
10 future::Future,
11 io::{self, IoSlice},
12 pin::Pin,
13 sync::Arc,
14 task::{ready, Context, Poll},
15 },
16 thiserror::Error,
17 tokio::io::AsyncWrite,
18};
19
20pub type RecvItem = Arc<Vec<u8>>;
21
22pub type RecvStream = BoxStream<'static, Result<RecvItem, RecvError>>;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
25pub enum RecvError {
26 #[error("channel lagged")]
27 Lagged,
28 #[error("channel closed")]
29 Closed,
30}
31
32#[derive(Debug, Error)]
33pub enum SubscribeError {
34 #[error("channel is not initialized yet")]
35 NotInitialized,
36 #[error("only available from slot {first_available}")]
37 SlotNotAvailable { first_available: Slot },
38}
39
40pub trait Subscribe {
41 fn subscribe(
42 &self,
43 replay_from_slot: Option<Slot>,
44 filter: Option<RichatFilter>,
45 ) -> Result<RecvStream, SubscribeError>;
46}
47
48#[derive(Debug)]
49pub struct WriteVectored<'a, W: ?Sized> {
50 writer: &'a mut W,
51 buffers: &'a mut [IoSlice<'a>],
52 offset: usize,
53}
54
55impl<'a, W> WriteVectored<'a, W> {
56 pub fn new(writer: &'a mut W, buffers: &'a mut [IoSlice<'a>]) -> Self {
57 Self {
58 writer,
59 buffers,
60 offset: 0,
61 }
62 }
63}
64
65impl<W> Future for WriteVectored<'_, W>
66where
67 W: AsyncWrite + Unpin + ?Sized,
68{
69 type Output = io::Result<()>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
72 let me = unsafe { self.get_unchecked_mut() };
73 while me.offset < me.buffers.len() {
74 let bufs = &me.buffers[me.offset..];
75 let mut n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, bufs))?;
76 if n == 0 {
77 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
78 }
79
80 while n > 0 {
81 if n >= me.buffers[me.offset].len() {
82 n -= me.buffers[me.offset].len();
83 me.offset += 1;
84 continue;
85 }
86
87 me.buffers[me.offset].advance(n);
88 n = 0;
89 }
90 }
91 Poll::Ready(Ok(()))
92 }
93}