ort_tcp/
client.rs

1//! TODO TCP clients shoudl automatically reconnect, but they don't
2
3use crate::{muxer, preface, ReplyCodec, SpecCodec};
4use ort_core::{Error, MakeOrt, Ort, Reply, Spec};
5use tokio::{
6    io,
7    net::TcpStream,
8    sync::{mpsc, oneshot},
9};
10use tokio_util::codec::{FramedRead, FramedWrite};
11use tracing::{debug, debug_span};
12
13#[derive(Clone)]
14pub struct MakeTcp {
15    buffer_capacity: usize,
16}
17
18#[derive(Clone)]
19pub struct Tcp {
20    tx: mpsc::Sender<(Spec, oneshot::Sender<Reply>)>,
21}
22
23impl MakeTcp {
24    pub fn new(buffer_capacity: usize) -> Self {
25        Self { buffer_capacity }
26    }
27}
28
29#[async_trait::async_trait]
30impl MakeOrt<String> for MakeTcp {
31    type Ort = Tcp;
32
33    async fn make_ort(&mut self, target: String) -> Result<Tcp, Error> {
34        debug!(%target, "Initializing a new connection");
35        let stream = TcpStream::connect(target).await?;
36        stream.set_nodelay(true)?;
37
38        let local = stream.local_addr()?;
39        let peer = stream.peer_addr()?;
40        let (rio, wio) = stream.into_split();
41        let write = FramedWrite::new(
42            wio,
43            preface::Codec::from(muxer::FramedEncode::from(SpecCodec::default())),
44        );
45        let read = FramedRead::new(rio, muxer::FramedDecode::from(ReplyCodec::default()));
46        let tx = debug_span!("conn", %local, %peer)
47            .in_scope(|| muxer::spawn_client(write, read, self.buffer_capacity));
48
49        Ok(Tcp { tx })
50    }
51}
52
53#[async_trait::async_trait]
54impl Ort for Tcp {
55    async fn ort(&mut self, spec: Spec) -> Result<Reply, Error> {
56        let (tx, rx) = oneshot::channel();
57        self.tx
58            .send((spec, tx))
59            .await
60            .map_err(|_| io::Error::new(io::ErrorKind::NotConnected, "Muxer lost"))?;
61        rx.await.map_err(|_| {
62            io::Error::new(io::ErrorKind::NotConnected, "Muxer dropped response").into()
63        })
64    }
65}