richat_shared/transports/
mod.rs1pub mod grpc;
2pub mod quic;
3
4use {
5 futures::stream::BoxStream,
6 richat_proto::richat::RichatFilter,
7 solana_sdk::clock::Slot,
8 std::{
9 future::Future,
10 io::{self, IoSlice},
11 pin::Pin,
12 sync::Arc,
13 task::{Context, Poll, ready},
14 },
15 thiserror::Error,
16 tokio::io::AsyncWrite,
17};
18
19pub type RecvItem = Arc<Vec<u8>>;
20
21pub type RecvStream = BoxStream<'static, Result<RecvItem, RecvError>>;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
24pub enum RecvError {
25 #[error("channel lagged")]
26 Lagged,
27 #[error("channel closed")]
28 Closed,
29}
30
31#[derive(Debug, Error)]
32pub enum SubscribeError {
33 #[error("channel is not initialized yet")]
34 NotInitialized,
35 #[error("only available from slot {first_available}")]
36 SlotNotAvailable { first_available: Slot },
37}
38
39pub trait Subscribe {
40 fn subscribe(
41 &self,
42 replay_from_slot: Option<Slot>,
43 filter: Option<RichatFilter>,
44 ) -> Result<RecvStream, SubscribeError>;
45}
46
47#[derive(Debug)]
48pub struct WriteVectored<'a, W: ?Sized> {
49 writer: &'a mut W,
50 buffers: &'a mut [IoSlice<'a>],
51 offset: usize,
52}
53
54impl<'a, W> WriteVectored<'a, W> {
55 pub const fn new(writer: &'a mut W, buffers: &'a mut [IoSlice<'a>]) -> Self {
56 Self {
57 writer,
58 buffers,
59 offset: 0,
60 }
61 }
62}
63
64impl<W> Future for WriteVectored<'_, W>
65where
66 W: AsyncWrite + Unpin + ?Sized,
67{
68 type Output = io::Result<()>;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
71 let me = unsafe { self.get_unchecked_mut() };
72 while me.offset < me.buffers.len() {
73 let bufs = &me.buffers[me.offset..];
74 let mut n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, bufs))?;
75 if n == 0 {
76 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
77 }
78
79 while n > 0 {
80 if n >= me.buffers[me.offset].len() {
81 n -= me.buffers[me.offset].len();
82 me.offset += 1;
83 continue;
84 }
85
86 me.buffers[me.offset].advance(n);
87 n = 0;
88 }
89 }
90 Poll::Ready(Ok(()))
91 }
92}