Skip to main content

hermod/server/
datapoint.rs

1//! DataPoint mini-protocol client (protocol 3)
2//!
3//! The acceptor-side can request named data points from the connected forwarder.
4//!
5//! ## Wire protocol (`trace-forward` DataPoint protocol)
6//!
7//! | Message | CBOR |
8//! |---------|------|
9//! | `MsgDataPointsRequest([String])` | `array(2)\[1, [name...]\]` |
10//! | `MsgDataPointsReply([(String, Option<Bytes>)])` | `array(2)\[3, [[name, maybe_value]...]\]` |
11//! | `MsgDone` | `array(1)\[2\]` |
12//!
13//! `DataPointValue` is raw JSON bytes (lazy bytestring in Haskell → `Vec<u8>` here).
14
15use pallas_codec::minicbor::{self, Decode, Decoder, Encode, Encoder};
16use pallas_network::multiplexer::{ChannelBuffer, Error};
17use tracing::debug;
18
19// ---------------------------------------------------------------------------
20// Protocol message types
21// ---------------------------------------------------------------------------
22
23/// Messages in the DataPoint mini-protocol
24#[derive(Debug)]
25pub enum DataPointMessage {
26    /// Request the given named data points
27    Request(Vec<String>),
28    /// Reply with values (None if a named point doesn't exist)
29    Reply(Vec<(String, Option<Vec<u8>>)>),
30    /// Terminate the session
31    Done,
32}
33
34impl Encode<()> for DataPointMessage {
35    fn encode<W: minicbor::encode::Write>(
36        &self,
37        e: &mut Encoder<W>,
38        _ctx: &mut (),
39    ) -> Result<(), minicbor::encode::Error<W::Error>> {
40        match self {
41            DataPointMessage::Request(names) => {
42                e.array(2)?.u8(1)?;
43                e.array(names.len() as u64)?;
44                for n in names {
45                    e.str(n)?;
46                }
47            }
48            DataPointMessage::Reply(items) => {
49                e.array(2)?.u8(3)?;
50                e.array(items.len() as u64)?;
51                for (name, value) in items {
52                    e.array(2)?;
53                    e.str(name)?;
54                    match value {
55                        None => {
56                            e.array(0)?;
57                        }
58                        Some(bytes) => {
59                            e.array(1)?;
60                            e.bytes(bytes)?;
61                        }
62                    }
63                }
64            }
65            DataPointMessage::Done => {
66                e.array(1)?.u8(2)?;
67            }
68        }
69        Ok(())
70    }
71}
72
73impl<'b> Decode<'b, ()> for DataPointMessage {
74    fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
75        d.array()?;
76        let tag = d.u8()?;
77        match tag {
78            1 => {
79                let mut names = Vec::new();
80                for item in d.array_iter::<String>()? {
81                    names.push(item?);
82                }
83                Ok(DataPointMessage::Request(names))
84            }
85            2 => Ok(DataPointMessage::Done),
86            3 => {
87                // The Haskell `serialise` library encodes non-empty lists as
88                // indefinite-length arrays (0x9F ... 0xFF).  We must handle
89                // both definite (Some(n)) and indefinite (None) cases.
90                let count = d.array()?;
91                let mut items = Vec::new();
92
93                match count {
94                    Some(n) => {
95                        for _ in 0..n {
96                            items.push(decode_reply_item(d)?);
97                        }
98                    }
99                    None => {
100                        // Indefinite array: read items until Break (0xFF).
101                        loop {
102                            if d.datatype()? == minicbor::data::Type::Break {
103                                d.skip()?; // consume the break token
104                                break;
105                            }
106                            items.push(decode_reply_item(d)?);
107                        }
108                    }
109                }
110
111                Ok(DataPointMessage::Reply(items))
112            }
113            _ => Err(minicbor::decode::Error::message("unknown DataPoint tag")),
114        }
115    }
116}
117
118/// Decode one `(DataPointName, Maybe LBS.ByteString)` item.
119///
120/// The Haskell `Serialise (a, b)` instance writes `array(2)[a, b]`.
121/// `Maybe LBS.ByteString` is `array(0)` for `Nothing` or `array(1)[bytes]`
122/// for `Just bs`.  The bytes may be indefinite-length (Haskell encodes
123/// `LBS.ByteString` via `encodeBytesIndef`), so we use `bytes_iter()` which
124/// handles both definite and indefinite byte strings.
125fn decode_reply_item(
126    d: &mut Decoder<'_>,
127) -> Result<(String, Option<Vec<u8>>), minicbor::decode::Error> {
128    d.array()?; // array(2) for the tuple
129    let name = d.str()?.to_string();
130
131    let maybe_len = d.array()?; // array(0) = Nothing, array(1) = Just
132    let value = match maybe_len {
133        Some(0) => None,
134        Some(1) => {
135            // Collect bytes from all chunks (handles both definite and
136            // indefinite-length byte strings from Haskell LBS.ByteString).
137            let mut buf = Vec::new();
138            for chunk in d.bytes_iter()? {
139                buf.extend_from_slice(chunk?);
140            }
141            Some(buf)
142        }
143        _ => {
144            return Err(minicbor::decode::Error::message(
145                "invalid Maybe encoding in DataPointsReply",
146            ));
147        }
148    };
149
150    Ok((name, value))
151}
152
153// ---------------------------------------------------------------------------
154// DataPoint client
155// ---------------------------------------------------------------------------
156
157/// Holds the DataPoint channel and allows on-demand queries
158pub struct DataPointClient {
159    channel: ChannelBuffer,
160}
161
162impl DataPointClient {
163    /// Create a new DataPoint client
164    pub fn new(channel: pallas_network::multiplexer::AgentChannel) -> Self {
165        DataPointClient {
166            channel: ChannelBuffer::new(channel),
167        }
168    }
169
170    /// Request the given named data points and return the responses.
171    /// Each value is parsed as JSON if possible.
172    pub async fn request(
173        &mut self,
174        names: Vec<String>,
175    ) -> Result<Vec<(String, Option<serde_json::Value>)>, Error> {
176        self.channel
177            .send_msg_chunks(&DataPointMessage::Request(names))
178            .await?;
179
180        let msg: DataPointMessage = self.channel.recv_full_msg().await?;
181        match msg {
182            DataPointMessage::Reply(items) => {
183                let parsed = items
184                    .into_iter()
185                    .map(|(name, bytes)| {
186                        let value = bytes.and_then(|b| serde_json::from_slice(&b).ok());
187                        (name, value)
188                    })
189                    .collect();
190                Ok(parsed)
191            }
192            DataPointMessage::Done => Err(Error::Decoding("DataPoint connection closed".into())),
193            DataPointMessage::Request(_) => {
194                Err(Error::Decoding("unexpected Request from forwarder".into()))
195            }
196        }
197    }
198
199    /// Hold the channel open indefinitely (keeps the mux alive)
200    /// without making any requests. Returns when the channel closes.
201    pub async fn run_idle_loop(mut self) {
202        // The forwarder drives the protocol; we just need to keep the channel
203        // alive so the mux doesn't stall. We can wait for the Done message.
204        loop {
205            match self.channel.recv_full_msg::<DataPointMessage>().await {
206                Ok(DataPointMessage::Done) => {
207                    debug!("DataPoint: remote sent Done");
208                    return;
209                }
210                Ok(_) => {
211                    // Unexpected message while idle; ignore
212                }
213                Err(_) => {
214                    return;
215                }
216            }
217        }
218    }
219}