1use crate::protocol::{Message, MsgTraceObjectsReply, TraceObject};
7use pallas_network::multiplexer;
8use thiserror::Error;
9use tracing::{debug, info};
10
11#[derive(Error, Debug)]
13pub enum ClientError {
14 #[error("multiplexer error: {0}")]
16 Multiplexer(#[from] multiplexer::Error),
17
18 #[error("invalid inbound message")]
20 InvalidInbound,
21
22 #[error("connection closed")]
24 ConnectionClosed,
25}
26
27pub struct TraceForwardClient {
29 channel: multiplexer::ChannelBuffer,
30}
31
32impl TraceForwardClient {
33 pub fn new(channel: multiplexer::AgentChannel) -> Self {
35 Self {
36 channel: multiplexer::ChannelBuffer::new(channel),
37 }
38 }
39
40 pub async fn send_message(&mut self, msg: &Message) -> Result<(), ClientError> {
42 debug!("Sending message");
43 self.channel
44 .send_msg_chunks(msg)
45 .await
46 .map_err(ClientError::Multiplexer)?;
47 Ok(())
48 }
49
50 pub async fn recv_message(&mut self) -> Result<Message, ClientError> {
52 let msg = self
53 .channel
54 .recv_full_msg()
55 .await
56 .map_err(ClientError::Multiplexer)?;
57 debug!("Received message");
58 Ok(msg)
59 }
60
61 pub async fn handle_request(&mut self, traces: Vec<TraceObject>) -> Result<(), ClientError> {
63 let msg = self.recv_message().await?;
65
66 match msg {
67 Message::TraceObjectsRequest(req) => {
68 debug!(
69 "Received request for {} traces (blocking: {})",
70 req.number_of_trace_objects, req.blocking
71 );
72
73 let to_send = traces
75 .into_iter()
76 .take(req.number_of_trace_objects as usize)
77 .collect();
78
79 let reply = Message::TraceObjectsReply(MsgTraceObjectsReply {
81 trace_objects: to_send,
82 });
83
84 self.send_message(&reply).await?;
85 Ok(())
86 }
87 Message::Done => {
88 info!("Received Done message");
89 Err(ClientError::ConnectionClosed)
90 }
91 _ => Err(ClientError::InvalidInbound),
92 }
93 }
94
95 pub async fn send_done(&mut self) -> Result<(), ClientError> {
97 self.send_message(&Message::Done).await
98 }
99}