hermod/server/datapoint.rs
1//! DataPoint mini-protocol client (protocol 3)
2//!
3//! The acceptor-side can request named data points from the connected forwarder.
4//!
5//! ## Wire protocol (`trace-forward` DataPoint protocol)
6//!
7//! | Message | CBOR |
8//! |---------|------|
9//! | `MsgDataPointsRequest([String])` | `array(2)\[1, [name...]\]` |
10//! | `MsgDataPointsReply([(String, Option<Bytes>)])` | `array(2)\[3, [[name, maybe_value]...]\]` |
11//! | `MsgDone` | `array(1)\[2\]` |
12//!
13//! `DataPointValue` is raw JSON bytes (lazy bytestring in Haskell → `Vec<u8>` here).
14
15use pallas_codec::minicbor::{self, Decode, Decoder, Encode, Encoder};
16use pallas_network::multiplexer::{ChannelBuffer, Error};
17use tracing::debug;
18
19// ---------------------------------------------------------------------------
20// Protocol message types
21// ---------------------------------------------------------------------------
22
23/// Messages in the DataPoint mini-protocol
24#[derive(Debug)]
25pub enum DataPointMessage {
26 /// Request the given named data points
27 Request(Vec<String>),
28 /// Reply with values (None if a named point doesn't exist)
29 Reply(Vec<(String, Option<Vec<u8>>)>),
30 /// Terminate the session
31 Done,
32}
33
34impl Encode<()> for DataPointMessage {
35 fn encode<W: minicbor::encode::Write>(
36 &self,
37 e: &mut Encoder<W>,
38 _ctx: &mut (),
39 ) -> Result<(), minicbor::encode::Error<W::Error>> {
40 match self {
41 DataPointMessage::Request(names) => {
42 e.array(2)?.u8(1)?;
43 e.array(names.len() as u64)?;
44 for n in names {
45 e.str(n)?;
46 }
47 }
48 DataPointMessage::Reply(items) => {
49 e.array(2)?.u8(3)?;
50 e.array(items.len() as u64)?;
51 for (name, value) in items {
52 e.array(2)?;
53 e.str(name)?;
54 match value {
55 None => {
56 e.array(0)?;
57 }
58 Some(bytes) => {
59 e.array(1)?;
60 e.bytes(bytes)?;
61 }
62 }
63 }
64 }
65 DataPointMessage::Done => {
66 e.array(1)?.u8(2)?;
67 }
68 }
69 Ok(())
70 }
71}
72
73impl<'b> Decode<'b, ()> for DataPointMessage {
74 fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
75 d.array()?;
76 let tag = d.u8()?;
77 match tag {
78 1 => {
79 let mut names = Vec::new();
80 for item in d.array_iter::<String>()? {
81 names.push(item?);
82 }
83 Ok(DataPointMessage::Request(names))
84 }
85 2 => Ok(DataPointMessage::Done),
86 3 => {
87 // The Haskell `serialise` library encodes non-empty lists as
88 // indefinite-length arrays (0x9F ... 0xFF). We must handle
89 // both definite (Some(n)) and indefinite (None) cases.
90 let count = d.array()?;
91 let mut items = Vec::new();
92
93 match count {
94 Some(n) => {
95 for _ in 0..n {
96 items.push(decode_reply_item(d)?);
97 }
98 }
99 None => {
100 // Indefinite array: read items until Break (0xFF).
101 loop {
102 if d.datatype()? == minicbor::data::Type::Break {
103 d.skip()?; // consume the break token
104 break;
105 }
106 items.push(decode_reply_item(d)?);
107 }
108 }
109 }
110
111 Ok(DataPointMessage::Reply(items))
112 }
113 _ => Err(minicbor::decode::Error::message("unknown DataPoint tag")),
114 }
115 }
116}
117
118/// Decode one `(DataPointName, Maybe LBS.ByteString)` item.
119///
120/// The Haskell `Serialise (a, b)` instance writes `array(2)[a, b]`.
121/// `Maybe LBS.ByteString` is `array(0)` for `Nothing` or `array(1)[bytes]`
122/// for `Just bs`. The bytes may be indefinite-length (Haskell encodes
123/// `LBS.ByteString` via `encodeBytesIndef`), so we use `bytes_iter()` which
124/// handles both definite and indefinite byte strings.
125fn decode_reply_item(
126 d: &mut Decoder<'_>,
127) -> Result<(String, Option<Vec<u8>>), minicbor::decode::Error> {
128 d.array()?; // array(2) for the tuple
129 let name = d.str()?.to_string();
130
131 let maybe_len = d.array()?; // array(0) = Nothing, array(1) = Just
132 let value = match maybe_len {
133 Some(0) => None,
134 Some(1) => {
135 // Collect bytes from all chunks (handles both definite and
136 // indefinite-length byte strings from Haskell LBS.ByteString).
137 let mut buf = Vec::new();
138 for chunk in d.bytes_iter()? {
139 buf.extend_from_slice(chunk?);
140 }
141 Some(buf)
142 }
143 _ => {
144 return Err(minicbor::decode::Error::message(
145 "invalid Maybe encoding in DataPointsReply",
146 ));
147 }
148 };
149
150 Ok((name, value))
151}
152
153// ---------------------------------------------------------------------------
154// DataPoint client
155// ---------------------------------------------------------------------------
156
157/// Holds the DataPoint channel and allows on-demand queries
158pub struct DataPointClient {
159 channel: ChannelBuffer,
160}
161
162impl DataPointClient {
163 /// Create a new DataPoint client
164 pub fn new(channel: pallas_network::multiplexer::AgentChannel) -> Self {
165 DataPointClient {
166 channel: ChannelBuffer::new(channel),
167 }
168 }
169
170 /// Request the given named data points and return the responses.
171 /// Each value is parsed as JSON if possible.
172 pub async fn request(
173 &mut self,
174 names: Vec<String>,
175 ) -> Result<Vec<(String, Option<serde_json::Value>)>, Error> {
176 self.channel
177 .send_msg_chunks(&DataPointMessage::Request(names))
178 .await?;
179
180 let msg: DataPointMessage = self.channel.recv_full_msg().await?;
181 match msg {
182 DataPointMessage::Reply(items) => {
183 let parsed = items
184 .into_iter()
185 .map(|(name, bytes)| {
186 let value = bytes.and_then(|b| serde_json::from_slice(&b).ok());
187 (name, value)
188 })
189 .collect();
190 Ok(parsed)
191 }
192 DataPointMessage::Done => Err(Error::Decoding("DataPoint connection closed".into())),
193 DataPointMessage::Request(_) => {
194 Err(Error::Decoding("unexpected Request from forwarder".into()))
195 }
196 }
197 }
198
199 /// Hold the channel open indefinitely (keeps the mux alive)
200 /// without making any requests. Returns when the channel closes.
201 pub async fn run_idle_loop(mut self) {
202 // The forwarder drives the protocol; we just need to keep the channel
203 // alive so the mux doesn't stall. We can wait for the Done message.
204 loop {
205 match self.channel.recv_full_msg::<DataPointMessage>().await {
206 Ok(DataPointMessage::Done) => {
207 debug!("DataPoint: remote sent Done");
208 return;
209 }
210 Ok(_) => {
211 // Unexpected message while idle; ignore
212 }
213 Err(_) => {
214 return;
215 }
216 }
217 }
218 }
219}