Skip to main content

hermod/mux/
client.rs

1//! Mux-aware trace-forward client
2//!
3//! This module implements a trace-forward client that uses the Pallas multiplexer
4//! to communicate with hermod-tracer over the Ouroboros Network protocol.
5
6use crate::protocol::{Message, MsgTraceObjectsReply, TraceObject};
7use pallas_network::multiplexer;
8use thiserror::Error;
9use tracing::{debug, info};
10
11/// Errors that can occur in the mux client
12#[derive(Error, Debug)]
13pub enum ClientError {
14    /// Error from the underlying multiplexer
15    #[error("multiplexer error: {0}")]
16    Multiplexer(#[from] multiplexer::Error),
17
18    /// An unexpected message was received from the acceptor
19    #[error("invalid inbound message")]
20    InvalidInbound,
21
22    /// The connection was closed by the remote end
23    #[error("connection closed")]
24    ConnectionClosed,
25}
26
27/// Mux-aware trace-forward client
28pub struct TraceForwardClient {
29    channel: multiplexer::ChannelBuffer,
30}
31
32impl TraceForwardClient {
33    /// Create a new client from a multiplexer channel
34    pub fn new(channel: multiplexer::AgentChannel) -> Self {
35        Self {
36            channel: multiplexer::ChannelBuffer::new(channel),
37        }
38    }
39
40    /// Send a message to the acceptor
41    pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> {
42        debug!("Sending message");
43        self.channel
44            .send_msg_chunks(msg)
45            .await
46            .map_err(ClientError::Multiplexer)?;
47        Ok(())
48    }
49
50    /// Receive a message from the acceptor
51    pub async fn recv_message(&mut self) -> Result<Message, ClientError> {
52        let msg = self
53            .channel
54            .recv_full_msg()
55            .await
56            .map_err(ClientError::Multiplexer)?;
57        debug!("Received message");
58        Ok(msg)
59    }
60
61    /// Wait for a trace objects request from the acceptor and send traces
62    pub async fn handle_request(&mut self, traces: Vec<TraceObject>) -> Result<(), ClientError> {
63        // Wait for request
64        let msg = self.recv_message().await?;
65
66        match msg {
67            Message::TraceObjectsRequest(req) => {
68                debug!(
69                    "Received request for {} traces (blocking: {})",
70                    req.number_of_trace_objects, req.blocking
71                );
72
73                // Take up to the requested number of traces
74                let to_send = traces
75                    .into_iter()
76                    .take(req.number_of_trace_objects as usize)
77                    .collect();
78
79                // Send reply
80                let reply = Message::TraceObjectsReply(MsgTraceObjectsReply {
81                    trace_objects: to_send,
82                });
83
84                self.send_message(&reply).await?;
85                Ok(())
86            }
87            Message::Done => {
88                info!("Received Done message");
89                Err(ClientError::ConnectionClosed)
90            }
91            _ => Err(ClientError::InvalidInbound),
92        }
93    }
94
95    /// Send a Done message to close the connection gracefully
96    pub async fn send_done(&mut self) -> Result<(), ClientError> {
97        self.send_message(&Message::Done).await
98    }
99}