1use anyhow::anyhow;
2use log::{error, info};
3use rkyv::{
4 api::high::{HighSerializer, HighValidator},
5 bytecheck::CheckBytes,
6 de::Pool,
7 rancor::Strategy,
8 ser::allocator::ArenaHandle,
9 util::AlignedVec,
10};
11use std::sync::{Arc, LazyLock, Mutex};
12
13use mt_sea::{ship::NetworkShipImpl, *};
14
15pub use mt_sea::VariableType;
16pub use mt_sea::net::NetArray;
17pub use rkyv::{Archive, Deserialize, Serialize};
18
19pub struct Rat {
20 name: String,
21 ship: Option<NetworkShipImpl>,
22}
23
24pub fn rfalse() -> NetArray<u8> {
25 nalgebra::DMatrix::<u8>::zeros(1, 1).into()
26}
27
28pub fn rtrue() -> NetArray<u8> {
29 let rf = rfalse();
30 let mut rf = nalgebra::DMatrix::<u8>::from(rf);
31 unsafe { *rf.get_unchecked_mut((0, 0)) = 1 };
32 rf.into()
33}
34
35static RT: LazyLock<Mutex<Option<Arc<tokio::runtime::Runtime>>>> =
36 LazyLock::new(|| Mutex::new(None));
37static RAT: LazyLock<Mutex<Option<Rat>>> = LazyLock::new(|| Mutex::new(None));
38
39impl Rat {
40 fn create(
41 name: &str,
42 timeout: Option<std::time::Duration>,
43 rt: Arc<tokio::runtime::Runtime>,
44 ) -> anyhow::Result<Self> {
45 let ship = rt.block_on(async {
46 let init_future = mt_sea::ship::NetworkShipImpl::init(
47 ShipKind::Rat(name.to_string()),
48 false,
49 mt_sea::Qos::Reliable,
50 );
51
52 match timeout {
53 None => Ok(Some(init_future.await?)),
54 Some(t) => match tokio::time::timeout(t, init_future).await {
55 Err(_) => Ok::<Option<NetworkShipImpl>, anyhow::Error>(None),
56 Ok(t) => Ok(Some(t?)),
57 },
58 }
59 })?;
60
61 Ok(Self {
62 name: name.to_string(),
63 ship,
64 })
65 }
66}
67
68pub fn init(
69 node_name: &str,
70 timeout: Option<std::time::Duration>,
71 runtime: Option<Arc<tokio::runtime::Runtime>>,
72) -> anyhow::Result<()> {
73 let mut rat_arc = RAT
74 .lock()
75 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
76
77 if rat_arc.is_some() {
78 return Err(anyhow::anyhow!("Rat already initialized"));
79 }
80
81 let mut srt = RT.lock().unwrap();
82 if let Some(rt) = runtime {
83 srt.replace(rt);
84 }
85
86 if srt.is_none() {
87 srt.replace(Arc::new(
88 tokio::runtime::Builder::new_current_thread()
89 .enable_all()
90 .build()
91 .unwrap(),
92 ));
93 }
94
95 let rt = srt.as_ref().expect("just set").clone();
96 let new_rat = Rat::create(node_name, timeout, rt)?;
97 rat_arc.replace(new_rat);
98
99 Ok(())
100}
101
102pub fn deinit() -> anyhow::Result<()> {
103 let mut rat_arc = RAT
104 .lock()
105 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
106
107 if rat_arc.is_none() {
108 return Err(anyhow::anyhow!("Rat not initialized"));
109 }
110
111 rat_arc.take();
112 Ok(())
113}
114
115pub fn bacon<T>(
119 variable_name: &str,
120 data: &mut T,
121 variable_type: VariableType,
122) -> anyhow::Result<()>
123where
124 T: Archive,
125 T::Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
126 + Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
127 T: 'static + Send,
128 T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
129 T: Send + Sync,
130{
131 let rat_arc = RAT
132 .lock()
133 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
134
135 let rat = rat_arc
136 .as_ref()
137 .ok_or(anyhow::anyhow!("Rat not initialized"))?;
138
139 let srt = RT.lock().unwrap();
140 let rt = srt.as_ref().ok_or(anyhow!(
141 "Async Runtime not initialized. Call init() before calling bacon()."
142 ))?;
143
144 if let Some(rat_ship) = rat.ship.as_ref() {
145 rt.block_on(async move {
146 match rat_ship.ask_for_action(variable_name).await {
147 Ok((mt_sea::Action::Sail, lock_until_ack)) => {
148 info!("Rat {} sails for variable {}", rat.name, variable_name);
149 let receiver = lock_until_ack.then_some({
150 let client = rat_ship.client.lock().await;
151 let sender = client.coordinator_receive.read().unwrap();
152 sender
153 .as_ref()
154 .expect("How are we receiving anything in the client? :)")
155 .subscribe()
156 });
157
158 if let Some(mut receiver) = receiver {
159 info!("Locked...");
160 loop {
161 let (packet, _) = receiver.recv().await?;
162 if matches!(packet.data, net::PacketKind::Acknowledge) {
163 break;
164 }
165 }
166 info!("Unlocked");
167 }
168
169 Ok(())
170 }
171 Ok((mt_sea::Action::Shoot { target, id }, lock_until_ack)) => {
172 info!("Rat {} shoots {} at {:?}", rat.name, variable_name, target);
173
174 let receiver = lock_until_ack.then_some({
175 let client = rat_ship.client.lock().await;
176 let sender = client.coordinator_receive.read().unwrap();
177 sender
178 .as_ref()
179 .expect("How are we receiving anything in the client? :)")
180 .subscribe()
181 });
182
183 rat_ship
184 .get_cannon()
185 .shoot(&target, id, data, variable_type, variable_name)
186 .await?;
187
188 if let Some(mut receiver) = receiver {
189 info!("Locked...");
190 loop {
191 let (packet, _) = receiver.recv().await?;
192 if matches!(packet.data, net::PacketKind::Acknowledge) {
193 break;
194 }
195 }
196 info!("Unlocked");
197 }
198
199 info!(
200 "Rat {} finished shooting {} at {:?}",
201 rat.name, variable_name, target
202 );
203
204 Ok(())
205 }
206 Ok((mt_sea::Action::Catch { source, id }, lock_until_ack)) => {
207 info!(
208 "Rat {} catches {} from {:?}",
209 rat.name, variable_name, source
210 );
211
212 let receiver = lock_until_ack.then_some({
213 let client = rat_ship.client.lock().await;
214 let sender = client.coordinator_receive.read().unwrap();
215 sender
216 .as_ref()
217 .expect("How are we receiving anything in the client? :)")
218 .subscribe()
219 });
220
221 let mut recv_data = rat_ship.get_cannon().catch::<T>(id).await?;
222
223 info!(
224 "Rat {} finished catching {} from {:?}",
225 rat.name, variable_name, source
226 );
227
228 *data = recv_data.remove(0);
230
231 if let Some(mut receiver) = receiver {
232 info!("Locked...");
233 loop {
234 let (packet, _) = receiver.recv().await?;
235 if matches!(packet.data, net::PacketKind::Acknowledge) {
236 break;
237 }
238 }
239 info!("Unlocked");
240 }
241
242 Ok(())
243 }
244 Err(e) => {
245 error!("Failed to get action: {}", e);
246 Err(e)
247 }
248 }
249 })
250 } else {
251 Ok(())
252 }
253}
254
255#[cfg(all(target_arch = "aarch64", target_vendor = "apple"))]
257type CFfiString = i8;
258
259#[cfg(all(
260 not(target_arch = "x86"),
261 not(target_arch = "x86_64"),
262 not(target_vendor = "apple")
263))]
264type CFfiString = u8;
265
266#[cfg(any(
267 target_arch = "x86",
268 target_arch = "x86_64",
269 all(target_vendor = "apple", not(target_arch = "aarch64"))
270))]
271type CFfiString = i8;
272
273#[unsafe(no_mangle)]
274pub unsafe extern "C" fn rat_init(node_name: *const CFfiString, timeout_secs: i32) -> i32 {
277 let init = || {
278 let node_name = unsafe { std::ffi::CStr::from_ptr(node_name) };
279 let node_name = node_name.to_str().unwrap();
280
281 let timeout = if timeout_secs <= 0 {
282 None
283 } else {
284 Some(std::time::Duration::from_secs(timeout_secs as u64))
285 };
286
287 init(node_name, timeout, None)
288 };
289
290 #[cfg(panic = "unwind")]
291 {
292 let catch = std::panic::catch_unwind(init);
293
294 match catch {
295 Ok(Ok(_)) => 0,
296 Ok(Err(e)) => {
297 error!("Could not initialize Rat: {e}.");
298 -1
299 }
300 Err(_) => {
301 error!("Rust did panic unexpectedly.");
302 -2
303 }
304 }
305 }
306
307 #[cfg(not(panic = "unwind"))]
308 {
309 let d = init();
310 match d {
311 Ok(_) => 0,
312 Err(e) => {
313 error!("Could not initialize Rat: {e}.");
314 -1
315 }
316 }
317 }
318}
319
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn rat_deinit() -> i32 {
324 #[cfg(panic = "unwind")]
325 {
326 let catch = std::panic::catch_unwind(deinit);
327
328 match catch {
329 Ok(Ok(_)) => 0,
330 Ok(Err(e)) => {
331 error!("Could not deinitialize Rat: {e}.");
332 -1
333 }
334 Err(_) => {
335 error!("Rust did panic unexpectedly.");
336 -2
337 }
338 }
339 }
340
341 #[cfg(not(panic = "unwind"))]
342 {
343 let d = deinit();
344 match d {
345 Ok(_) => 0,
346 Err(e) => {
347 error!("Could not deinitialize Rat: {e}.");
348 -1
349 }
350 }
351 }
352}
353
354#[unsafe(no_mangle)]
355pub unsafe extern "C" fn rat_bacon_f32(
359 variable_name: *const CFfiString,
360 data: *mut f32,
361 rows: usize,
362 cols: usize,
363) -> i32 {
364 let f = || {
365 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
366 let variable_name = variable_name.to_str().unwrap();
367
368 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
369 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
370
371 let mut net_mat = NetArray::from(matrix);
372 bacon(variable_name, &mut net_mat, VariableType::F32).map(|_| {
373 let matrix: nalgebra::DMatrix<f32> = net_mat.into();
374 for c in 0..cols {
375 for r in 0..rows {
376 data[c * rows + r] = matrix[(r, c)];
377 }
378 }
379 })
380 };
381
382 #[cfg(panic = "unwind")]
383 {
384 let catch = std::panic::catch_unwind(f);
385
386 match catch {
387 Ok(Ok(_)) => 0,
388 Ok(Err(e)) => {
389 error!("Failed to bacon: {}", e);
390 -1
391 }
392 Err(_) => {
393 error!("Rust did panic unexpectedly.");
394 -2
395 }
396 }
397 }
398
399 #[cfg(not(panic = "unwind"))]
400 {
401 let d = f();
402 match d {
403 Ok(_) => 0,
404 Err(e) => {
405 error!("Could not deinitialize Rat: {e}.");
406 -1
407 }
408 }
409 }
410}
411
412#[unsafe(no_mangle)]
413pub unsafe extern "C" fn rat_bacon_f64(
417 variable_name: *const CFfiString,
418 data: *mut f64,
419 rows: usize,
420 cols: usize,
421) -> i32 {
422 let f = || {
423 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
424 let variable_name = variable_name.to_str().unwrap();
425
426 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
427 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
428
429 let mut net_mat = NetArray::from(matrix);
430 bacon(variable_name, &mut net_mat, VariableType::F64).map(|_| {
431 let matrix: nalgebra::DMatrix<f64> = net_mat.into();
432 for c in 0..cols {
433 for r in 0..rows {
434 data[c * rows + r] = matrix[(r, c)];
435 }
436 }
437 })
438 };
439
440 #[cfg(panic = "unwind")]
441 {
442 let catch = std::panic::catch_unwind(f);
443
444 match catch {
445 Ok(Ok(_)) => 0,
446 Ok(Err(e)) => {
447 error!("Failed to bacon: {}", e);
448 -1
449 }
450 Err(_) => {
451 error!("Rust did panic unexpectedly.");
452 -2
453 }
454 }
455 }
456 #[cfg(not(panic = "unwind"))]
457 {
458 let d = f();
459 match d {
460 Ok(_) => 0,
461 Err(e) => {
462 error!("Could not deinitialize Rat: {e}.");
463 -1
464 }
465 }
466 }
467}
468
469#[unsafe(no_mangle)]
470pub unsafe extern "C" fn rat_bacon_i32(
474 variable_name: *const CFfiString,
475 data: *mut i32,
476 rows: usize,
477 cols: usize,
478) -> i32 {
479 let f = || {
480 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
481 let variable_name = variable_name.to_str().unwrap();
482
483 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
484 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
485
486 let mut net_mat = NetArray::from(matrix);
487 bacon(variable_name, &mut net_mat, VariableType::I32).map(|_| {
488 let matrix: nalgebra::DMatrix<i32> = net_mat.into();
489 for c in 0..cols {
490 for r in 0..rows {
491 data[c * rows + r] = matrix[(r, c)];
492 }
493 }
494 })
495 };
496
497 #[cfg(panic = "unwind")]
498 {
499 let catch = std::panic::catch_unwind(f);
500
501 match catch {
502 Ok(Ok(_)) => 0,
503 Ok(Err(e)) => {
504 error!("Failed to bacon: {}", e);
505 -1
506 }
507 Err(_) => {
508 error!("Rust did panic unexpectedly.");
509 -2
510 }
511 }
512 }
513 #[cfg(not(panic = "unwind"))]
514 {
515 let d = f();
516 match d {
517 Ok(_) => 0,
518 Err(e) => {
519 error!("Could not deinitialize Rat: {e}.");
520 -1
521 }
522 }
523 }
524}
525
526#[unsafe(no_mangle)]
527pub unsafe extern "C" fn rat_bacon_u8(
531 variable_name: *const CFfiString,
532 data: *mut u8,
533 rows: usize,
534 cols: usize,
535) -> i32 {
536 let f = || {
537 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
538 let variable_name = variable_name.to_str().unwrap();
539
540 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
541 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
542
543 let mut net_mat = NetArray::from(matrix);
544 bacon(variable_name, &mut net_mat, VariableType::U8).map(|_| {
545 let matrix: nalgebra::DMatrix<u8> = net_mat.into();
546 for c in 0..cols {
547 for r in 0..rows {
548 data[c * rows + r] = matrix[(r, c)];
549 }
550 }
551 })
552 };
553
554 #[cfg(panic = "unwind")]
555 {
556 let catch = std::panic::catch_unwind(f);
557
558 match catch {
559 Ok(Ok(_)) => 0,
560 Ok(Err(e)) => {
561 error!("Failed to bacon: {}", e);
562 -1
563 }
564 Err(_) => {
565 error!("Rust did panic unexpectedly.");
566 -2
567 }
568 }
569 }
570 #[cfg(not(panic = "unwind"))]
571 {
572 let d = f();
573 match d {
574 Ok(_) => 0,
575 Err(e) => {
576 error!("Could not deinitialize Rat: {e}.");
577 -1
578 }
579 }
580 }
581}