Skip to main content

mt_sea/
lib.rs

1pub mod client;
2pub mod coordinator;
3pub mod net;
4pub mod ship;
5
6use mt_net::{ActionPlan, BagMsg, Rules, VariableHuman};
7
8/// Initialize logging with zenoh logs filtered to warn level regardless of RUST_LOG setting.
9/// Uses RUST_LOG env var for other crates, defaulting to `info` if not set.
10pub fn init_logging() {
11    use env_logger::Env;
12    let env = Env::new().filter_or("RUST_LOG", "info");
13    env_logger::Builder::from_env(env)
14        .filter_module("zenoh", log::LevelFilter::Warn)
15        .filter_module("zenoh_transport", log::LevelFilter::Warn)
16        .filter_module("zenoh_link", log::LevelFilter::Warn)
17        .filter_module("zenoh_protocol", log::LevelFilter::Warn)
18        .init();
19}
20use rkyv::{
21    Archive, Deserialize, Serialize,
22    api::high::{HighSerializer, HighValidator},
23    bytecheck::CheckBytes,
24    de::Pool,
25    rancor::Strategy,
26    ser::allocator::ArenaHandle,
27    util::AlignedVec,
28};
29
30#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize, Hash, Eq, PartialOrd, Ord)]
31pub enum ShipKind {
32    Rat(String),
33    Wind(String),
34}
35
36pub type ShipName = i128;
37
38#[derive(Debug, Clone, Serialize, Deserialize, Archive, PartialEq, Eq)]
39pub struct NetworkShipAddress {
40    ip: [u8; 4],
41    pub port: u16,
42    ship: ShipName,
43    pub kind: ShipKind,
44}
45
46#[derive(Debug, Archive, Clone, Default, Serialize, Deserialize)]
47pub enum Action {
48    #[default]
49    Sail,
50    Shoot {
51        target: Vec<NetworkShipAddress>,
52        id: u32,
53    },
54    Catch {
55        source: NetworkShipAddress,
56        id: u32,
57    },
58}
59
60#[derive(Clone, Debug)]
61pub struct Variable {
62    pub ship: ShipName,
63    pub strategy: Option<Action>,
64}
65
66pub fn get_strategies(
67    haystack: &Rules,
68    rat_ship: &str,
69    variable: String,
70    indirect_parent_rat: Option<&str>,
71) -> Vec<ActionPlan> {
72    match haystack.raw().get(&variable) {
73        None => vec![ActionPlan::default()],
74        Some(plans) => {
75            // directly because rule was set
76            let directly = plans
77                .iter()
78                .filter(|plan| plan.ship == rat_ship)
79                .filter_map(|el| el.strategy.clone())
80                .collect::<Vec<_>>();
81
82            // as other part of a rule
83            let mut indirect = plans
84                .iter()
85                .filter(|plan| indirect_parent_rat.is_none_or(|parent_rat| plan.ship == parent_rat))
86                .filter_map(|plan| match plan.strategy.as_ref()? {
87                    ActionPlan::Sail => None,
88                    ActionPlan::Shoot { target, id } => target
89                        .iter()
90                        .find(|shoot_target| *shoot_target == rat_ship)
91                        .map(|_| ActionPlan::Catch {
92                            source: plan.ship.clone(),
93                            id: *id,
94                        }),
95                    ActionPlan::Catch { source, id } => {
96                        if source == rat_ship {
97                            Some(ActionPlan::Shoot {
98                                target: vec![source.clone()],
99                                id: *id,
100                            })
101                        } else {
102                            None
103                        }
104                    }
105                })
106                .collect::<Vec<_>>();
107
108            indirect.extend(directly);
109            indirect
110        }
111    }
112}
113
114#[async_trait::async_trait]
115pub trait Ship: Send + Sync + 'static {
116    /// Indicate a trigger point and ask the link pilot what to do with the variable.
117    async fn ask_for_action(&self, variable_name: &str) -> anyhow::Result<(Action, bool)>;
118
119    // async fn wait_for_action(&self) -> anyhow::Result<crate::Action>;
120
121    async fn wait_for_wind(&self) -> anyhow::Result<Vec<WindData>>;
122
123    fn get_cannon(&self) -> &impl Cannon;
124}
125
126#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, Default)]
127pub enum VariableType {
128    #[default]
129    StaticOnly, // statically supported but no dynamic conversion implemented
130    U8,
131    I32,
132    F32,
133    F64,
134}
135
136impl From<u8> for VariableType {
137    fn from(value: u8) -> Self {
138        match value {
139            1 => Self::U8,
140            2 => Self::I32,
141            3 => Self::F32,
142            4 => Self::F64,
143            _ => Self::default(),
144        }
145    }
146}
147
148impl From<VariableType> for u8 {
149    fn from(value: VariableType) -> Self {
150        match value {
151            VariableType::StaticOnly => 0,
152            VariableType::U8 => 1,
153            VariableType::I32 => 2,
154            VariableType::F32 => 3,
155            VariableType::F64 => 4,
156        }
157    }
158}
159
160use rkyv::rancor::Error as RkyvError;
161
162// Trait for types that can be Sent (Serialized).
163// Requires Sized, Send, Sync, 'static, and the specific rkyv Serialize bound.
164pub trait Sendable: Sized + Send + Sync + 'static
165where
166    Self: for<'b> Serialize<HighSerializer<AlignedVec, ArenaHandle<'b>, RkyvError>>,
167    Self: Archive<
168        Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
169                      + Deserialize<Self, Strategy<Pool, rkyv::rancor::Error>>,
170    >,
171{
172}
173// Blanket implementation for Sendable. Any type meeting the bounds is Sendable.
174impl<T> Sendable for T
175where
176    T: Sized + Send + Sync + 'static,
177    T: for<'b> Serialize<HighSerializer<AlignedVec, ArenaHandle<'b>, RkyvError>>,
178    T: Archive<
179        Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
180                      + Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
181    >,
182{
183}
184
185#[async_trait::async_trait]
186pub trait Cannon: Send + Sync + 'static {
187    // Initialize a 1:1 connection to the target. Ports are shared using the sea network internally.
188
189    /// Dump the data to the target.
190    async fn shoot<'b, T: Sendable>(
191        &self,
192        targets: &'b [crate::NetworkShipAddress],
193        id: u32,
194        data: &T,
195        variable_type: VariableType,
196        variable_name: &str,
197    ) -> anyhow::Result<()>;
198
199    /// Catch the dumped data from the source.
200    /// The returning Vec can contain previously missed entities of T from existing sync connections.
201    /// The first item of T is the newest, followed by incremental older ones.
202    async fn catch<T: Sendable>(&self, id: u32) -> anyhow::Result<Vec<T>>;
203
204    async fn catch_dyn(&self, id: u32) -> anyhow::Result<Vec<(String, VariableType, String)>>;
205}
206
207#[derive(Clone, Debug, Default, Copy, Archive, Serialize, Deserialize, PartialEq)]
208pub struct TimeMsg {
209    pub sec: i32,
210    pub nanosec: u32,
211}
212
213#[derive(Clone, Debug, Default, PartialEq, Archive, Serialize, Deserialize)]
214pub struct Header {
215    pub seq: u32,
216    pub stamp: TimeMsg,
217    pub frame_id: String,
218}
219
220pub type WindData = BagMsg;
221
222#[async_trait::async_trait]
223pub trait Coordinator: Send + Sync + 'static {
224    async fn rat_action_request_queue(
225        &self,
226        ship: String,
227    ) -> anyhow::Result<tokio::sync::broadcast::Receiver<String>>;
228
229    async fn blow_wind(&self, ship: String, data: Vec<WindData>) -> anyhow::Result<()>;
230
231    async fn rat_action_send(
232        &self,
233        ship: String,
234        variable: String,
235        action: ActionPlan,
236        lock_until_ack: bool,
237    ) -> anyhow::Result<()>;
238}