tokio_hglib/
protocol.rs

1//! Low-level building blocks for the command server protocol.
2
3use bytes::Bytes;
4use futures::SinkExt;
5use std::io;
6use tokio_stream::{self as stream, StreamExt};
7
8use crate::codec::{BlockMessage, ChannelMessage};
9use crate::connection::Connection;
10
11/// Connection wrapper to process channel requests and responses.
12///
13/// This provides a low-level interface to communicate with the Mercurial
14/// command server.
15#[derive(Debug)]
16pub struct Protocol<C> {
17    conn: C,
18}
19
20impl<C> Protocol<C> {
21    /// Creates a new Protocol that wraps the given connection.
22    pub fn new(conn: C) -> Self {
23        Protocol { conn }
24    }
25
26    /// Unwraps the underlying `Connection`.
27    pub fn into_connection(self) -> C {
28        self.conn
29    }
30}
31
32#[cfg(unix)]
33mod unix {
34    use super::*;
35    use std::os::unix::io::{AsRawFd, RawFd};
36
37    impl<C> AsRawFd for Protocol<C>
38    where
39        C: AsRawFd,
40    {
41        fn as_raw_fd(&self) -> RawFd {
42            self.conn.as_raw_fd()
43        }
44    }
45}
46
47impl<C> Protocol<C>
48where
49    C: Connection,
50{
51    /// Sends the command of no argument without waiting for response.
52    ///
53    /// This is equivalent to `OneShotRequest::start()` of tokio-hglib 0.2.
54    /// For `MessageLoop::start()`, call this function and fetch responses.
55    pub async fn send_command(&mut self, cmd: impl Into<Bytes>) -> io::Result<()> {
56        self.conn
57            .get_tx_mut()
58            .send(BlockMessage::Command(cmd.into()))
59            .await?;
60        Ok(())
61    }
62
63    /// Sends the command and arguments without waiting for response.
64    ///
65    /// This is equivalent to `OneShotRequest::start_with_args()` of tokio-hglib 0.2.
66    /// For `MessageLoop::start_with_args()`, call this function and fetch responses.
67    pub async fn send_command_with_args(
68        &mut self,
69        cmd: impl Into<Bytes>,
70        packed_args: impl Into<Bytes>,
71    ) -> io::Result<()> {
72        let blocks = vec![
73            Ok(BlockMessage::Command(cmd.into())),
74            Ok(BlockMessage::Data(packed_args.into())),
75        ];
76        self.conn
77            .get_tx_mut()
78            .send_all(&mut stream::iter(blocks))
79            .await?;
80        Ok(())
81    }
82
83    /// Sends the given data back to the server.
84    ///
85    /// For `MessageLoop::resume_with_data()` of tokio-hglib 0.2, call this function
86    /// and fetch responses.
87    pub async fn send_data(&mut self, data: impl Into<Bytes>) -> io::Result<()> {
88        self.conn
89            .get_tx_mut()
90            .send(BlockMessage::Data(data.into()))
91            .await?;
92        Ok(())
93    }
94
95    /// Sends the command of no argument, and fetches the result data.
96    ///
97    /// This is equivalent to `OneShotQuery::start()` of tokio-hglib 0.2.
98    pub async fn query(&mut self, cmd: impl Into<Bytes>) -> io::Result<Bytes> {
99        self.send_command(cmd).await?;
100        self.fetch_result().await
101    }
102
103    /// Sends the command and arguments, and fetches the result data.
104    ///
105    /// This is equivalent to `OneShotQuery::start_with_args()` of tokio-hglib 0.2.
106    pub async fn query_with_args(
107        &mut self,
108        cmd: impl Into<Bytes>,
109        packed_args: impl Into<Bytes>,
110    ) -> io::Result<Bytes> {
111        self.send_command_with_args(cmd, packed_args).await?;
112        self.fetch_result().await
113    }
114
115    /// Fetches response message from the server.
116    ///
117    /// This is equivalent to `MessageLoop::resume()` of tokio-hglib 0.2.
118    pub async fn fetch_response(&mut self) -> io::Result<ChannelMessage> {
119        let v = self.conn.get_rx_mut().try_next().await?;
120        expect_msg(v)
121    }
122
123    async fn fetch_result(&mut self) -> io::Result<Bytes> {
124        loop {
125            match self.fetch_response().await? {
126                ChannelMessage::Data(b'r', data) => {
127                    return Ok(data);
128                }
129                ChannelMessage::Data(..) => {
130                    // just ignore data sent to uninteresting (optional) channel
131                }
132                ChannelMessage::InputRequest(..)
133                | ChannelMessage::LineRequest(..)
134                | ChannelMessage::SystemRequest(..) => {
135                    return Err(io::Error::new(
136                        io::ErrorKind::InvalidData,
137                        "unsupported request while querying",
138                    ));
139                }
140            }
141        }
142    }
143}
144
145fn expect_msg(v: Option<ChannelMessage>) -> Result<ChannelMessage, io::Error> {
146    v.ok_or(io::Error::new(
147        io::ErrorKind::UnexpectedEof,
148        "no result code received",
149    ))
150}