iridis_node/
message.rs

1//! This module defines a message type for the dataflow communication
2
3use arrow_data::ArrayData;
4use tokio::sync::mpsc::{Receiver, Sender};
5use uuid::Uuid;
6
7use crate::prelude::*;
8
9use uhlc::Timestamp;
10
11/// Header for a dataflow message
12#[derive(Debug, PartialEq, Clone)]
13pub struct Header {
14    /// Timestamp of the message, representing when the message was created by the runtime (sender side)
15    pub timestamp: Timestamp,
16
17    /// Identifier of the message, representing the source node uuid and the IO it's coming from (output, query or queryable)
18    pub source: (Uuid, Uuid),
19}
20
21/// Dataflow message. Cheap to clone
22#[derive(Debug, PartialEq, Clone)]
23pub struct DataflowMessage {
24    pub header: Header,
25    pub data: ArrayData,
26}
27
28/// MPSC Message sender. Can be cloned, cheap to clone
29pub type MessageSender = Sender<DataflowMessage>;
30
31/// MPSC Message receiver. Cannot be cloned
32pub type MessageReceiver = Receiver<DataflowMessage>;
33
34/// Typed dataflow message
35#[derive(Debug, PartialEq, Clone)]
36pub struct TypedDataflowMessage<T: ArrowMessage> {
37    pub header: Header,
38    pub data: T,
39}
40
41impl<T> TryFrom<DataflowMessage> for TypedDataflowMessage<T>
42where
43    T: ArrowMessage,
44{
45    type Error = eyre::Report;
46
47    fn try_from(value: DataflowMessage) -> Result<Self> {
48        let data = T::try_from_arrow(value.data)?;
49
50        Ok(Self {
51            header: value.header,
52            data,
53        })
54    }
55}