Skip to main content

rat/
lib.rs

1use anyhow::anyhow;
2use log::{error, info};
3use rkyv::{
4    api::high::{HighSerializer, HighValidator},
5    bytecheck::CheckBytes,
6    de::Pool,
7    rancor::Strategy,
8    ser::allocator::ArenaHandle,
9    util::AlignedVec,
10};
11use std::sync::{Arc, LazyLock, Mutex};
12
13use mt_sea::{ship::NetworkShipImpl, *};
14
15pub use mt_sea::VariableType;
16pub use mt_sea::net::NetArray;
17pub use rkyv::{Archive, Deserialize, Serialize};
18
19pub struct Rat {
20    name: String,
21    ship: Option<NetworkShipImpl>,
22}
23
24pub fn rfalse() -> NetArray<u8> {
25    nalgebra::DMatrix::<u8>::zeros(1, 1).into()
26}
27
28pub fn rtrue() -> NetArray<u8> {
29    let rf = rfalse();
30    let mut rf = nalgebra::DMatrix::<u8>::from(rf);
31    unsafe { *rf.get_unchecked_mut((0, 0)) = 1 };
32    rf.into()
33}
34
35static RT: LazyLock<Mutex<Option<Arc<tokio::runtime::Runtime>>>> =
36    LazyLock::new(|| Mutex::new(None));
37static RAT: LazyLock<Mutex<Option<Rat>>> = LazyLock::new(|| Mutex::new(None));
38
39impl Rat {
40    fn create(
41        name: &str,
42        timeout: Option<std::time::Duration>,
43        rt: Arc<tokio::runtime::Runtime>,
44    ) -> anyhow::Result<Self> {
45        let ship = rt.block_on(async {
46            let init_future = mt_sea::ship::NetworkShipImpl::init(
47                ShipKind::Rat(name.to_string()),
48                false,
49                mt_sea::Qos::Reliable,
50            );
51
52            match timeout {
53                None => Ok(Some(init_future.await?)),
54                Some(t) => match tokio::time::timeout(t, init_future).await {
55                    Err(_) => Ok::<Option<NetworkShipImpl>, anyhow::Error>(None),
56                    Ok(t) => Ok(Some(t?)),
57                },
58            }
59        })?;
60
61        Ok(Self {
62            name: name.to_string(),
63            ship,
64        })
65    }
66}
67
68pub fn init(
69    node_name: &str,
70    timeout: Option<std::time::Duration>,
71    runtime: Option<Arc<tokio::runtime::Runtime>>,
72) -> anyhow::Result<()> {
73    let mut rat_arc = RAT
74        .lock()
75        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
76
77    if rat_arc.is_some() {
78        return Err(anyhow::anyhow!("Rat already initialized"));
79    }
80
81    let mut srt = RT.lock().unwrap();
82    if let Some(rt) = runtime {
83        srt.replace(rt);
84    }
85
86    if srt.is_none() {
87        srt.replace(Arc::new(
88            tokio::runtime::Builder::new_current_thread()
89                .enable_all()
90                .build()
91                .unwrap(),
92        ));
93    }
94
95    let rt = srt.as_ref().expect("just set").clone();
96    let new_rat = Rat::create(node_name, timeout, rt)?;
97    rat_arc.replace(new_rat);
98
99    Ok(())
100}
101
102pub fn deinit() -> anyhow::Result<()> {
103    let mut rat_arc = RAT
104        .lock()
105        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
106
107    if rat_arc.is_none() {
108        return Err(anyhow::anyhow!("Rat not initialized"));
109    }
110
111    rat_arc.take();
112    Ok(())
113}
114
115/// When the code reaches a variable that is watched, call this function to communicate synchronously with the link.
116/// It syncs with the other rats and gets the action to be taken for the current var.
117/// It then applies the action to the variable and returns.
118pub fn bacon<T>(
119    variable_name: &str,
120    data: &mut T,
121    variable_type: VariableType,
122) -> anyhow::Result<()>
123where
124    T: Archive,
125    T::Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
126        + Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
127    T: 'static + Send,
128    T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
129    T: Send + Sync,
130{
131    let rat_arc = RAT
132        .lock()
133        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
134
135    let rat = rat_arc
136        .as_ref()
137        .ok_or(anyhow::anyhow!("Rat not initialized"))?;
138
139    let srt = RT.lock().unwrap();
140    let rt = srt.as_ref().ok_or(anyhow!(
141        "Async Runtime not initialized. Call init() before calling bacon()."
142    ))?;
143
144    if let Some(rat_ship) = rat.ship.as_ref() {
145        rt.block_on(async move {
146            match rat_ship.ask_for_action(variable_name).await {
147                Ok((mt_sea::Action::Sail, lock_until_ack)) => {
148                    info!("Rat {} sails for variable {}", rat.name, variable_name);
149                    let receiver = lock_until_ack.then_some({
150                        let client = rat_ship.client.lock().await;
151                        let sender = client.coordinator_receive.read().unwrap();
152                        sender
153                            .as_ref()
154                            .expect("How are we receiving anything in the client? :)")
155                            .subscribe()
156                    });
157
158                    if let Some(mut receiver) = receiver {
159                        info!("Locked...");
160                        loop {
161                            let (packet, _) = receiver.recv().await?;
162                            if matches!(packet.data, net::PacketKind::Acknowledge) {
163                                break;
164                            }
165                        }
166                        info!("Unlocked");
167                    }
168
169                    Ok(())
170                }
171                Ok((mt_sea::Action::Shoot { target, id }, lock_until_ack)) => {
172                    info!("Rat {} shoots {} at {:?}", rat.name, variable_name, target);
173
174                    let receiver = lock_until_ack.then_some({
175                        let client = rat_ship.client.lock().await;
176                        let sender = client.coordinator_receive.read().unwrap();
177                        sender
178                            .as_ref()
179                            .expect("How are we receiving anything in the client? :)")
180                            .subscribe()
181                    });
182
183                    rat_ship
184                        .get_cannon()
185                        .shoot(&target, id, data, variable_type, variable_name)
186                        .await?;
187
188                    if let Some(mut receiver) = receiver {
189                        info!("Locked...");
190                        loop {
191                            let (packet, _) = receiver.recv().await?;
192                            if matches!(packet.data, net::PacketKind::Acknowledge) {
193                                break;
194                            }
195                        }
196                        info!("Unlocked");
197                    }
198
199                    info!(
200                        "Rat {} finished shooting {} at {:?}",
201                        rat.name, variable_name, target
202                    );
203
204                    Ok(())
205                }
206                Ok((mt_sea::Action::Catch { source, id }, lock_until_ack)) => {
207                    info!(
208                        "Rat {} catches {} from {:?}",
209                        rat.name, variable_name, source
210                    );
211
212                    let receiver = lock_until_ack.then_some({
213                        let client = rat_ship.client.lock().await;
214                        let sender = client.coordinator_receive.read().unwrap();
215                        sender
216                            .as_ref()
217                            .expect("How are we receiving anything in the client? :)")
218                            .subscribe()
219                    });
220
221                    let mut recv_data = rat_ship.get_cannon().catch::<T>(id).await?;
222
223                    info!(
224                        "Rat {} finished catching {} from {:?}",
225                        rat.name, variable_name, source
226                    );
227
228                    // The first index is the newest
229                    *data = recv_data.remove(0);
230
231                    if let Some(mut receiver) = receiver {
232                        info!("Locked...");
233                        loop {
234                            let (packet, _) = receiver.recv().await?;
235                            if matches!(packet.data, net::PacketKind::Acknowledge) {
236                                break;
237                            }
238                        }
239                        info!("Unlocked");
240                    }
241
242                    Ok(())
243                }
244                Err(e) => {
245                    error!("Failed to get action: {}", e);
246                    Err(e)
247                }
248            }
249        })
250    } else {
251        Ok(())
252    }
253}
254
255// C FFI
256#[cfg(all(target_arch = "aarch64", target_vendor = "apple"))]
257type CFfiString = i8;
258
259#[cfg(all(
260    not(target_arch = "x86"),
261    not(target_arch = "x86_64"),
262    not(target_vendor = "apple")
263))]
264type CFfiString = u8;
265
266#[cfg(any(
267    target_arch = "x86",
268    target_arch = "x86_64",
269    all(target_vendor = "apple", not(target_arch = "aarch64"))
270))]
271type CFfiString = i8;
272
273#[unsafe(no_mangle)]
274/// # Safety
275/// C interop
276pub unsafe extern "C" fn rat_init(node_name: *const CFfiString, timeout_secs: i32) -> i32 {
277    let init = || {
278        let node_name = unsafe { std::ffi::CStr::from_ptr(node_name) };
279        let node_name = node_name.to_str().unwrap();
280
281        let timeout = if timeout_secs <= 0 {
282            None
283        } else {
284            Some(std::time::Duration::from_secs(timeout_secs as u64))
285        };
286
287        init(node_name, timeout, None)
288    };
289
290    #[cfg(panic = "unwind")]
291    {
292        let catch = std::panic::catch_unwind(init);
293
294        match catch {
295            Ok(Ok(_)) => 0,
296            Ok(Err(e)) => {
297                error!("Could not initialize Rat: {e}.");
298                -1
299            }
300            Err(_) => {
301                error!("Rust did panic unexpectedly.");
302                -2
303            }
304        }
305    }
306
307    #[cfg(not(panic = "unwind"))]
308    {
309        let d = init();
310        match d {
311            Ok(_) => 0,
312            Err(e) => {
313                error!("Could not initialize Rat: {e}.");
314                -1
315            }
316        }
317    }
318}
319
320#[unsafe(no_mangle)]
321/// # Safety
322/// C interop
323pub unsafe extern "C" fn rat_deinit() -> i32 {
324    #[cfg(panic = "unwind")]
325    {
326        let catch = std::panic::catch_unwind(deinit);
327
328        match catch {
329            Ok(Ok(_)) => 0,
330            Ok(Err(e)) => {
331                error!("Could not deinitialize Rat: {e}.");
332                -1
333            }
334            Err(_) => {
335                error!("Rust did panic unexpectedly.");
336                -2
337            }
338        }
339    }
340
341    #[cfg(not(panic = "unwind"))]
342    {
343        let d = deinit();
344        match d {
345            Ok(_) => 0,
346            Err(e) => {
347                error!("Could not deinitialize Rat: {e}.");
348                -1
349            }
350        }
351    }
352}
353
354#[unsafe(no_mangle)]
355/// Matrix must be in column-major order.
356/// # Safety
357/// C interop
358pub unsafe extern "C" fn rat_bacon_f32(
359    variable_name: *const CFfiString,
360    data: *mut f32,
361    rows: usize,
362    cols: usize,
363) -> i32 {
364    let f = || {
365        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
366        let variable_name = variable_name.to_str().unwrap();
367
368        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
369        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
370
371        let mut net_mat = NetArray::from(matrix);
372        bacon(variable_name, &mut net_mat, VariableType::F32).map(|_| {
373            let matrix: nalgebra::DMatrix<f32> = net_mat.into();
374            for c in 0..cols {
375                for r in 0..rows {
376                    data[c * rows + r] = matrix[(r, c)];
377                }
378            }
379        })
380    };
381
382    #[cfg(panic = "unwind")]
383    {
384        let catch = std::panic::catch_unwind(f);
385
386        match catch {
387            Ok(Ok(_)) => 0,
388            Ok(Err(e)) => {
389                error!("Failed to bacon: {}", e);
390                -1
391            }
392            Err(_) => {
393                error!("Rust did panic unexpectedly.");
394                -2
395            }
396        }
397    }
398
399    #[cfg(not(panic = "unwind"))]
400    {
401        let d = f();
402        match d {
403            Ok(_) => 0,
404            Err(e) => {
405                error!("Could not deinitialize Rat: {e}.");
406                -1
407            }
408        }
409    }
410}
411
412#[unsafe(no_mangle)]
413/// Matrix must be in column-major order.
414/// # Safety
415/// C interop
416pub unsafe extern "C" fn rat_bacon_f64(
417    variable_name: *const CFfiString,
418    data: *mut f64,
419    rows: usize,
420    cols: usize,
421) -> i32 {
422    let f = || {
423        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
424        let variable_name = variable_name.to_str().unwrap();
425
426        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
427        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
428
429        let mut net_mat = NetArray::from(matrix);
430        bacon(variable_name, &mut net_mat, VariableType::F64).map(|_| {
431            let matrix: nalgebra::DMatrix<f64> = net_mat.into();
432            for c in 0..cols {
433                for r in 0..rows {
434                    data[c * rows + r] = matrix[(r, c)];
435                }
436            }
437        })
438    };
439
440    #[cfg(panic = "unwind")]
441    {
442        let catch = std::panic::catch_unwind(f);
443
444        match catch {
445            Ok(Ok(_)) => 0,
446            Ok(Err(e)) => {
447                error!("Failed to bacon: {}", e);
448                -1
449            }
450            Err(_) => {
451                error!("Rust did panic unexpectedly.");
452                -2
453            }
454        }
455    }
456    #[cfg(not(panic = "unwind"))]
457    {
458        let d = f();
459        match d {
460            Ok(_) => 0,
461            Err(e) => {
462                error!("Could not deinitialize Rat: {e}.");
463                -1
464            }
465        }
466    }
467}
468
469#[unsafe(no_mangle)]
470/// Matrix must be in column-major order.
471/// # Safety
472/// C interop
473pub unsafe extern "C" fn rat_bacon_i32(
474    variable_name: *const CFfiString,
475    data: *mut i32,
476    rows: usize,
477    cols: usize,
478) -> i32 {
479    let f = || {
480        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
481        let variable_name = variable_name.to_str().unwrap();
482
483        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
484        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
485
486        let mut net_mat = NetArray::from(matrix);
487        bacon(variable_name, &mut net_mat, VariableType::I32).map(|_| {
488            let matrix: nalgebra::DMatrix<i32> = net_mat.into();
489            for c in 0..cols {
490                for r in 0..rows {
491                    data[c * rows + r] = matrix[(r, c)];
492                }
493            }
494        })
495    };
496
497    #[cfg(panic = "unwind")]
498    {
499        let catch = std::panic::catch_unwind(f);
500
501        match catch {
502            Ok(Ok(_)) => 0,
503            Ok(Err(e)) => {
504                error!("Failed to bacon: {}", e);
505                -1
506            }
507            Err(_) => {
508                error!("Rust did panic unexpectedly.");
509                -2
510            }
511        }
512    }
513    #[cfg(not(panic = "unwind"))]
514    {
515        let d = f();
516        match d {
517            Ok(_) => 0,
518            Err(e) => {
519                error!("Could not deinitialize Rat: {e}.");
520                -1
521            }
522        }
523    }
524}
525
526#[unsafe(no_mangle)]
527/// Matrix must be in column-major order.
528/// # Safety
529/// C interop
530pub unsafe extern "C" fn rat_bacon_u8(
531    variable_name: *const CFfiString,
532    data: *mut u8,
533    rows: usize,
534    cols: usize,
535) -> i32 {
536    let f = || {
537        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
538        let variable_name = variable_name.to_str().unwrap();
539
540        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
541        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
542
543        let mut net_mat = NetArray::from(matrix);
544        bacon(variable_name, &mut net_mat, VariableType::U8).map(|_| {
545            let matrix: nalgebra::DMatrix<u8> = net_mat.into();
546            for c in 0..cols {
547                for r in 0..rows {
548                    data[c * rows + r] = matrix[(r, c)];
549                }
550            }
551        })
552    };
553
554    #[cfg(panic = "unwind")]
555    {
556        let catch = std::panic::catch_unwind(f);
557
558        match catch {
559            Ok(Ok(_)) => 0,
560            Ok(Err(e)) => {
561                error!("Failed to bacon: {}", e);
562                -1
563            }
564            Err(_) => {
565                error!("Rust did panic unexpectedly.");
566                -2
567            }
568        }
569    }
570    #[cfg(not(panic = "unwind"))]
571    {
572        let d = f();
573        match d {
574            Ok(_) => 0,
575            Err(e) => {
576                error!("Could not deinitialize Rat: {e}.");
577                -1
578            }
579        }
580    }
581}