use pallas_codec::minicbor::{self, Decode, Decoder, Encode, Encoder};
use pallas_network::multiplexer::{ChannelBuffer, Error};
use tracing::debug;
#[derive(Debug)]
pub enum DataPointMessage {
Request(Vec<String>),
Reply(Vec<(String, Option<Vec<u8>>)>),
Done,
}
impl Encode<()> for DataPointMessage {
fn encode<W: minicbor::encode::Write>(
&self,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), minicbor::encode::Error<W::Error>> {
match self {
DataPointMessage::Request(names) => {
e.array(2)?.u8(1)?;
e.array(names.len() as u64)?;
for n in names {
e.str(n)?;
}
}
DataPointMessage::Reply(items) => {
e.array(2)?.u8(3)?;
e.array(items.len() as u64)?;
for (name, value) in items {
e.array(2)?;
e.str(name)?;
match value {
None => {
e.array(0)?;
}
Some(bytes) => {
e.array(1)?;
e.bytes(bytes)?;
}
}
}
}
DataPointMessage::Done => {
e.array(1)?.u8(2)?;
}
}
Ok(())
}
}
impl<'b> Decode<'b, ()> for DataPointMessage {
fn decode(d: &mut Decoder<'b>, _ctx: &mut ()) -> Result<Self, minicbor::decode::Error> {
d.array()?;
let tag = d.u8()?;
match tag {
1 => {
let mut names = Vec::new();
for item in d.array_iter::<String>()? {
names.push(item?);
}
Ok(DataPointMessage::Request(names))
}
2 => Ok(DataPointMessage::Done),
3 => {
let count = d.array()?;
let mut items = Vec::new();
match count {
Some(n) => {
for _ in 0..n {
items.push(decode_reply_item(d)?);
}
}
None => {
loop {
if d.datatype()? == minicbor::data::Type::Break {
d.skip()?; break;
}
items.push(decode_reply_item(d)?);
}
}
}
Ok(DataPointMessage::Reply(items))
}
_ => Err(minicbor::decode::Error::message("unknown DataPoint tag")),
}
}
}
fn decode_reply_item(
d: &mut Decoder<'_>,
) -> Result<(String, Option<Vec<u8>>), minicbor::decode::Error> {
d.array()?; let name = d.str()?.to_string();
let maybe_len = d.array()?; let value = match maybe_len {
Some(0) => None,
Some(1) => {
let mut buf = Vec::new();
for chunk in d.bytes_iter()? {
buf.extend_from_slice(chunk?);
}
Some(buf)
}
_ => {
return Err(minicbor::decode::Error::message(
"invalid Maybe encoding in DataPointsReply",
));
}
};
Ok((name, value))
}
pub struct DataPointClient {
channel: ChannelBuffer,
}
impl DataPointClient {
pub fn new(channel: pallas_network::multiplexer::AgentChannel) -> Self {
DataPointClient {
channel: ChannelBuffer::new(channel),
}
}
pub async fn request(
&mut self,
names: Vec<String>,
) -> Result<Vec<(String, Option<serde_json::Value>)>, Error> {
self.channel
.send_msg_chunks(&DataPointMessage::Request(names))
.await?;
let msg: DataPointMessage = self.channel.recv_full_msg().await?;
match msg {
DataPointMessage::Reply(items) => {
let parsed = items
.into_iter()
.map(|(name, bytes)| {
let value = bytes.and_then(|b| serde_json::from_slice(&b).ok());
(name, value)
})
.collect();
Ok(parsed)
}
DataPointMessage::Done => Err(Error::Decoding("DataPoint connection closed".into())),
DataPointMessage::Request(_) => {
Err(Error::Decoding("unexpected Request from forwarder".into()))
}
}
}
pub async fn run_idle_loop(mut self) {
loop {
match self.channel.recv_full_msg::<DataPointMessage>().await {
Ok(DataPointMessage::Done) => {
debug!("DataPoint: remote sent Done");
return;
}
Ok(_) => {
}
Err(_) => {
return;
}
}
}
}
}