Skip to main content

rat/
lib.rs

1use anyhow::anyhow;
2use log::{error, info};
3use rkyv::{
4    api::high::{HighSerializer, HighValidator},
5    bytecheck::CheckBytes,
6    de::Pool,
7    rancor::Strategy,
8    ser::allocator::ArenaHandle,
9    util::AlignedVec,
10};
11use std::sync::{Arc, LazyLock, Mutex};
12
13use mt_sea::{ship::NetworkShipImpl, *};
14
15pub use mt_sea::VariableType;
16pub use mt_sea::net::NetArray;
17pub use rkyv::{Archive, Deserialize, Serialize};
18
19pub struct Rat {
20    name: String,
21    ship: Option<NetworkShipImpl>,
22}
23
24pub fn rfalse() -> NetArray<u8> {
25    nalgebra::DMatrix::<u8>::zeros(1, 1).into()
26}
27
28pub fn rtrue() -> NetArray<u8> {
29    let rf = rfalse();
30    let mut rf = nalgebra::DMatrix::<u8>::from(rf);
31    unsafe { *rf.get_unchecked_mut((0, 0)) = 1 };
32    rf.into()
33}
34
35static RT: LazyLock<Mutex<Option<Arc<tokio::runtime::Runtime>>>> =
36    LazyLock::new(|| Mutex::new(None));
37static RAT: LazyLock<Mutex<Option<Rat>>> = LazyLock::new(|| Mutex::new(None));
38
39impl Rat {
40    fn create(
41        name: &str,
42        timeout: Option<std::time::Duration>,
43        rt: Arc<tokio::runtime::Runtime>,
44    ) -> anyhow::Result<Self> {
45        let ship = rt.block_on(async {
46            let init_future =
47                mt_sea::ship::NetworkShipImpl::init(ShipKind::Rat(name.to_string()), None, false);
48
49            match timeout {
50                None => Ok(Some(init_future.await?)),
51                Some(t) => match tokio::time::timeout(t, init_future).await {
52                    Err(_) => Ok::<Option<NetworkShipImpl>, anyhow::Error>(None),
53                    Ok(t) => Ok(Some(t?)),
54                },
55            }
56        })?;
57
58        Ok(Self {
59            name: name.to_string(),
60            ship,
61        })
62    }
63}
64
65pub fn init(
66    node_name: &str,
67    timeout: Option<std::time::Duration>,
68    runtime: Option<Arc<tokio::runtime::Runtime>>,
69) -> anyhow::Result<()> {
70    let mut rat_arc = RAT
71        .lock()
72        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
73
74    if rat_arc.is_some() {
75        return Err(anyhow::anyhow!("Rat already initialized"));
76    }
77
78    let mut srt = RT.lock().unwrap();
79    if let Some(rt) = runtime {
80        srt.replace(rt);
81    }
82
83    if srt.is_none() {
84        srt.replace(Arc::new(
85            tokio::runtime::Builder::new_current_thread()
86                .enable_all()
87                .build()
88                .unwrap(),
89        ));
90    }
91
92    let rt = srt.as_ref().expect("just set").clone();
93    let new_rat = Rat::create(node_name, timeout, rt)?;
94    rat_arc.replace(new_rat);
95
96    Ok(())
97}
98
99pub fn deinit() -> anyhow::Result<()> {
100    let mut rat_arc = RAT
101        .lock()
102        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
103
104    if rat_arc.is_none() {
105        return Err(anyhow::anyhow!("Rat not initialized"));
106    }
107
108    rat_arc.take();
109    Ok(())
110}
111
112/// When the code reaches a variable that is watched, call this function to communicate synchronously with the link.
113/// It syncs with the other rats and gets the action to be taken for the current var.
114/// It then applies the action to the variable and returns.
115pub fn bacon<T>(
116    variable_name: &str,
117    data: &mut T,
118    variable_type: VariableType,
119) -> anyhow::Result<()>
120where
121    T: Archive,
122    T::Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
123        + Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
124    T: 'static + Send,
125    T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
126    T: Send + Sync,
127{
128    let rat_arc = RAT
129        .lock()
130        .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
131
132    let rat = rat_arc
133        .as_ref()
134        .ok_or(anyhow::anyhow!("Rat not initialized"))?;
135
136    let srt = RT.lock().unwrap();
137    let rt = srt.as_ref().ok_or(anyhow!(
138        "Async Runtime not initialized. Call init() before calling bacon()."
139    ))?;
140
141    if let Some(rat_ship) = rat.ship.as_ref() {
142        rt.block_on(async move {
143            match rat_ship.ask_for_action(variable_name).await {
144                Ok((mt_sea::Action::Sail, lock_until_ack)) => {
145                    info!("Rat {} sails for variable {}", rat.name, variable_name);
146                    let receiver = lock_until_ack.then_some({
147                        let client = rat_ship.client.lock().await;
148                        let sender = client.coordinator_receive.read().unwrap();
149                        sender
150                            .as_ref()
151                            .expect("How are we receiving anything in the client? :)")
152                            .subscribe()
153                    });
154
155                    if let Some(mut receiver) = receiver {
156                        info!("Locked...");
157                        loop {
158                            let (packet, _) = receiver.recv().await?;
159                            if matches!(packet.data, net::PacketKind::Acknowledge) {
160                                break;
161                            }
162                        }
163                        info!("Unlocked");
164                    }
165
166                    Ok(())
167                }
168                Ok((mt_sea::Action::Shoot { target, id }, lock_until_ack)) => {
169                    info!("Rat {} shoots {} at {:?}", rat.name, variable_name, target);
170
171                    let receiver = lock_until_ack.then_some({
172                        let client = rat_ship.client.lock().await;
173                        let sender = client.coordinator_receive.read().unwrap();
174                        sender
175                            .as_ref()
176                            .expect("How are we receiving anything in the client? :)")
177                            .subscribe()
178                    });
179
180                    rat_ship
181                        .get_cannon()
182                        .shoot(&target, id, data, variable_type, variable_name)
183                        .await?;
184
185                    if let Some(mut receiver) = receiver {
186                        info!("Locked...");
187                        loop {
188                            let (packet, _) = receiver.recv().await?;
189                            if matches!(packet.data, net::PacketKind::Acknowledge) {
190                                break;
191                            }
192                        }
193                        info!("Unlocked");
194                    }
195
196                    info!(
197                        "Rat {} finished shooting {} at {:?}",
198                        rat.name, variable_name, target
199                    );
200
201                    Ok(())
202                }
203                Ok((mt_sea::Action::Catch { source, id }, lock_until_ack)) => {
204                    info!(
205                        "Rat {} catches {} from {:?}",
206                        rat.name, variable_name, source
207                    );
208
209                    let receiver = lock_until_ack.then_some({
210                        let client = rat_ship.client.lock().await;
211                        let sender = client.coordinator_receive.read().unwrap();
212                        sender
213                            .as_ref()
214                            .expect("How are we receiving anything in the client? :)")
215                            .subscribe()
216                    });
217
218                    let mut recv_data = rat_ship.get_cannon().catch::<T>(id).await?;
219
220                    info!(
221                        "Rat {} finished catching {} from {:?}",
222                        rat.name, variable_name, source
223                    );
224
225                    // The first index is the newest
226                    *data = recv_data.remove(0);
227
228                    if let Some(mut receiver) = receiver {
229                        info!("Locked...");
230                        loop {
231                            let (packet, _) = receiver.recv().await?;
232                            if matches!(packet.data, net::PacketKind::Acknowledge) {
233                                break;
234                            }
235                        }
236                        info!("Unlocked");
237                    }
238
239                    Ok(())
240                }
241                Err(e) => {
242                    error!("Failed to get action: {}", e);
243                    Err(e)
244                }
245            }
246        })
247    } else {
248        Ok(())
249    }
250}
251
252// C FFI
253#[cfg(all(target_arch = "aarch64", target_vendor = "apple"))]
254type CFfiString = i8;
255
256#[cfg(all(
257    not(target_arch = "x86"),
258    not(target_arch = "x86_64"),
259    not(target_vendor = "apple")
260))]
261type CFfiString = u8;
262
263#[cfg(any(
264    target_arch = "x86",
265    target_arch = "x86_64",
266    all(target_vendor = "apple", not(target_arch = "aarch64"))
267))]
268type CFfiString = i8;
269
270#[unsafe(no_mangle)]
271/// # Safety
272/// C interop
273pub unsafe extern "C" fn rat_init(node_name: *const CFfiString, timeout_secs: i32) -> i32 {
274    let init = || {
275        let node_name = unsafe { std::ffi::CStr::from_ptr(node_name) };
276        let node_name = node_name.to_str().unwrap();
277
278        let timeout = if timeout_secs <= 0 {
279            None
280        } else {
281            Some(std::time::Duration::from_secs(timeout_secs as u64))
282        };
283
284        init(node_name, timeout, None)
285    };
286
287    #[cfg(panic = "unwind")]
288    {
289        let catch = std::panic::catch_unwind(init);
290
291        match catch {
292            Ok(Ok(_)) => 0,
293            Ok(Err(e)) => {
294                error!("Could not initialize Rat: {e}.");
295                -1
296            }
297            Err(_) => {
298                error!("Rust did panic unexpectedly.");
299                -2
300            }
301        }
302    }
303
304    #[cfg(not(panic = "unwind"))]
305    {
306        let d = init();
307        match d {
308            Ok(_) => 0,
309            Err(e) => {
310                error!("Could not initialize Rat: {e}.");
311                -1
312            }
313        }
314    }
315}
316
317#[unsafe(no_mangle)]
318/// # Safety
319/// C interop
320pub unsafe extern "C" fn rat_deinit() -> i32 {
321    #[cfg(panic = "unwind")]
322    {
323        let catch = std::panic::catch_unwind(deinit);
324
325        match catch {
326            Ok(Ok(_)) => 0,
327            Ok(Err(e)) => {
328                error!("Could not deinitialize Rat: {e}.");
329                -1
330            }
331            Err(_) => {
332                error!("Rust did panic unexpectedly.");
333                -2
334            }
335        }
336    }
337
338    #[cfg(not(panic = "unwind"))]
339    {
340        let d = deinit();
341        match d {
342            Ok(_) => 0,
343            Err(e) => {
344                error!("Could not deinitialize Rat: {e}.");
345                -1
346            }
347        }
348    }
349}
350
351#[unsafe(no_mangle)]
352/// Matrix must be in column-major order.
353/// # Safety
354/// C interop
355pub unsafe extern "C" fn rat_bacon_f32(
356    variable_name: *const CFfiString,
357    data: *mut f32,
358    rows: usize,
359    cols: usize,
360) -> i32 {
361    let f = || {
362        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
363        let variable_name = variable_name.to_str().unwrap();
364
365        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
366        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
367
368        let mut net_mat = NetArray::from(matrix);
369        bacon(variable_name, &mut net_mat, VariableType::F32).map(|_| {
370            let matrix: nalgebra::DMatrix<f32> = net_mat.into();
371            for c in 0..cols {
372                for r in 0..rows {
373                    data[c * rows + r] = matrix[(r, c)];
374                }
375            }
376        })
377    };
378
379    #[cfg(panic = "unwind")]
380    {
381        let catch = std::panic::catch_unwind(f);
382
383        match catch {
384            Ok(Ok(_)) => 0,
385            Ok(Err(e)) => {
386                error!("Failed to bacon: {}", e);
387                -1
388            }
389            Err(_) => {
390                error!("Rust did panic unexpectedly.");
391                -2
392            }
393        }
394    }
395
396    #[cfg(not(panic = "unwind"))]
397    {
398        let d = f();
399        match d {
400            Ok(_) => 0,
401            Err(e) => {
402                error!("Could not deinitialize Rat: {e}.");
403                -1
404            }
405        }
406    }
407}
408
409#[unsafe(no_mangle)]
410/// Matrix must be in column-major order.
411/// # Safety
412/// C interop
413pub unsafe extern "C" fn rat_bacon_f64(
414    variable_name: *const CFfiString,
415    data: *mut f64,
416    rows: usize,
417    cols: usize,
418) -> i32 {
419    let f = || {
420        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
421        let variable_name = variable_name.to_str().unwrap();
422
423        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
424        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
425
426        let mut net_mat = NetArray::from(matrix);
427        bacon(variable_name, &mut net_mat, VariableType::F64).map(|_| {
428            let matrix: nalgebra::DMatrix<f64> = net_mat.into();
429            for c in 0..cols {
430                for r in 0..rows {
431                    data[c * rows + r] = matrix[(r, c)];
432                }
433            }
434        })
435    };
436
437    #[cfg(panic = "unwind")]
438    {
439        let catch = std::panic::catch_unwind(f);
440
441        match catch {
442            Ok(Ok(_)) => 0,
443            Ok(Err(e)) => {
444                error!("Failed to bacon: {}", e);
445                -1
446            }
447            Err(_) => {
448                error!("Rust did panic unexpectedly.");
449                -2
450            }
451        }
452    }
453    #[cfg(not(panic = "unwind"))]
454    {
455        let d = f();
456        match d {
457            Ok(_) => 0,
458            Err(e) => {
459                error!("Could not deinitialize Rat: {e}.");
460                -1
461            }
462        }
463    }
464}
465
466#[unsafe(no_mangle)]
467/// Matrix must be in column-major order.
468/// # Safety
469/// C interop
470pub unsafe extern "C" fn rat_bacon_i32(
471    variable_name: *const CFfiString,
472    data: *mut i32,
473    rows: usize,
474    cols: usize,
475) -> i32 {
476    let f = || {
477        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
478        let variable_name = variable_name.to_str().unwrap();
479
480        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
481        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
482
483        let mut net_mat = NetArray::from(matrix);
484        bacon(variable_name, &mut net_mat, VariableType::I32).map(|_| {
485            let matrix: nalgebra::DMatrix<i32> = net_mat.into();
486            for c in 0..cols {
487                for r in 0..rows {
488                    data[c * rows + r] = matrix[(r, c)];
489                }
490            }
491        })
492    };
493
494    #[cfg(panic = "unwind")]
495    {
496        let catch = std::panic::catch_unwind(f);
497
498        match catch {
499            Ok(Ok(_)) => 0,
500            Ok(Err(e)) => {
501                error!("Failed to bacon: {}", e);
502                -1
503            }
504            Err(_) => {
505                error!("Rust did panic unexpectedly.");
506                -2
507            }
508        }
509    }
510    #[cfg(not(panic = "unwind"))]
511    {
512        let d = f();
513        match d {
514            Ok(_) => 0,
515            Err(e) => {
516                error!("Could not deinitialize Rat: {e}.");
517                -1
518            }
519        }
520    }
521}
522
523#[unsafe(no_mangle)]
524/// Matrix must be in column-major order.
525/// # Safety
526/// C interop
527pub unsafe extern "C" fn rat_bacon_u8(
528    variable_name: *const CFfiString,
529    data: *mut u8,
530    rows: usize,
531    cols: usize,
532) -> i32 {
533    let f = || {
534        let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
535        let variable_name = variable_name.to_str().unwrap();
536
537        let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
538        let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
539
540        let mut net_mat = NetArray::from(matrix);
541        bacon(variable_name, &mut net_mat, VariableType::U8).map(|_| {
542            let matrix: nalgebra::DMatrix<u8> = net_mat.into();
543            for c in 0..cols {
544                for r in 0..rows {
545                    data[c * rows + r] = matrix[(r, c)];
546                }
547            }
548        })
549    };
550
551    #[cfg(panic = "unwind")]
552    {
553        let catch = std::panic::catch_unwind(f);
554
555        match catch {
556            Ok(Ok(_)) => 0,
557            Ok(Err(e)) => {
558                error!("Failed to bacon: {}", e);
559                -1
560            }
561            Err(_) => {
562                error!("Rust did panic unexpectedly.");
563                -2
564            }
565        }
566    }
567    #[cfg(not(panic = "unwind"))]
568    {
569        let d = f();
570        match d {
571            Ok(_) => 0,
572            Err(e) => {
573                error!("Could not deinitialize Rat: {e}.");
574                -1
575            }
576        }
577    }
578}