Skip to main content

dora_message/
lib.rs

1//! Enable serialisation and deserialisation of capnproto messages
2//!
3
4#![allow(clippy::missing_safety_doc)]
5
6/// The version of the dora-message crate
7pub const VERSION: &str = env!("CARGO_PKG_VERSION");
8
9pub use tarpc;
10pub use uhlc;
11
12pub mod common;
13pub mod config;
14pub mod descriptor;
15pub mod id;
16pub mod metadata;
17
18pub mod coordinator_to_daemon;
19pub mod daemon_to_coordinator;
20
21pub mod daemon_to_daemon;
22
23pub mod daemon_to_node;
24pub mod node_to_daemon;
25
26pub mod cli_to_coordinator;
27pub mod coordinator_to_cli;
28
29pub mod integration_testing_format;
30
31pub use arrow_data;
32pub use arrow_schema;
33use uuid::{Timestamp, Uuid};
34
35/// Unique identifier for a dataflow instance.
36///
37/// Dora assigns each dataflow instance a unique ID on start.
38pub type DataflowId = uuid::Uuid;
39
40#[derive(
41    Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
42)]
43pub struct SessionId(uuid::Uuid);
44
45impl SessionId {
46    pub fn generate() -> Self {
47        Self(Uuid::new_v7(Timestamp::now(uuid::NoContext)))
48    }
49
50    pub fn uuid(&self) -> uuid::Uuid {
51        self.0
52    }
53}
54
55#[derive(
56    Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
57)]
58pub struct BuildId(uuid::Uuid);
59
60impl BuildId {
61    pub fn generate() -> Self {
62        Self(Uuid::new_v7(Timestamp::now(uuid::NoContext)))
63    }
64}
65
66impl std::fmt::Display for BuildId {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "BuildId({})", self.0)
69    }
70}
71
72/// Check whether a remote version string is compatible with this crate's version.
73pub fn check_version_compatibility(remote_version: &str) -> eyre::Result<()> {
74    let crate_version = current_crate_version();
75    let specified_version = semver::Version::parse(remote_version)
76        .map_err(|e| eyre::eyre!("failed to parse remote version `{remote_version}`: {e}"))?;
77    let compatible =
78        versions_compatible(&crate_version, &specified_version).map_err(|e| eyre::eyre!(e))?;
79    if compatible {
80        Ok(())
81    } else {
82        Err(eyre::eyre!(
83            "version mismatch: remote message format v{specified_version} is not compatible \
84            with local message format v{crate_version}"
85        ))
86    }
87}
88
89pub(crate) fn current_crate_version() -> semver::Version {
90    let crate_version_raw = env!("CARGO_PKG_VERSION");
91
92    semver::Version::parse(crate_version_raw).unwrap()
93}
94
95pub(crate) fn versions_compatible(
96    crate_version: &semver::Version,
97    specified_version: &semver::Version,
98) -> Result<bool, String> {
99    let req = semver::VersionReq::parse(&crate_version.to_string()).map_err(|error| {
100        format!("failed to parse crate version `{crate_version}` as `VersionReq`: {error}")
101    })?;
102    let specified_dora_req = semver::VersionReq::parse(&specified_version.to_string())
103        .map_err(|error| {
104            format!(
105                "failed to parse specified dora version `{specified_version}` as `VersionReq`: {error}",
106            )
107        })?;
108    let matches = req.matches(specified_version) || specified_dora_req.matches(crate_version);
109    Ok(matches)
110}