1use anyhow::anyhow;
2use log::{error, info};
3use rkyv::{
4 api::high::{HighSerializer, HighValidator},
5 bytecheck::CheckBytes,
6 de::Pool,
7 rancor::Strategy,
8 ser::allocator::ArenaHandle,
9 util::AlignedVec,
10};
11use std::sync::{Arc, LazyLock, Mutex};
12
13use mt_sea::{ship::NetworkShipImpl, *};
14
15pub use mt_sea::VariableType;
16pub use mt_sea::net::NetArray;
17pub use rkyv::{Archive, Deserialize, Serialize};
18
19pub struct Rat {
20 name: String,
21 ship: Option<NetworkShipImpl>,
22}
23
24pub fn rfalse() -> NetArray<u8> {
25 nalgebra::DMatrix::<u8>::zeros(1, 1).into()
26}
27
28pub fn rtrue() -> NetArray<u8> {
29 let rf = rfalse();
30 let mut rf = nalgebra::DMatrix::<u8>::from(rf);
31 unsafe { *rf.get_unchecked_mut((0, 0)) = 1 };
32 rf.into()
33}
34
35static RT: LazyLock<Mutex<Option<Arc<tokio::runtime::Runtime>>>> =
36 LazyLock::new(|| Mutex::new(None));
37static RAT: LazyLock<Mutex<Option<Rat>>> = LazyLock::new(|| Mutex::new(None));
38
39impl Rat {
40 fn create(
41 name: &str,
42 timeout: Option<std::time::Duration>,
43 rt: Arc<tokio::runtime::Runtime>,
44 ) -> anyhow::Result<Self> {
45 let ship = rt.block_on(async {
46 let init_future =
47 mt_sea::ship::NetworkShipImpl::init(ShipKind::Rat(name.to_string()), false);
48
49 match timeout {
50 None => Ok(Some(init_future.await?)),
51 Some(t) => match tokio::time::timeout(t, init_future).await {
52 Err(_) => Ok::<Option<NetworkShipImpl>, anyhow::Error>(None),
53 Ok(t) => Ok(Some(t?)),
54 },
55 }
56 })?;
57
58 Ok(Self {
59 name: name.to_string(),
60 ship,
61 })
62 }
63}
64
65pub fn init(
66 node_name: &str,
67 timeout: Option<std::time::Duration>,
68 runtime: Option<Arc<tokio::runtime::Runtime>>,
69) -> anyhow::Result<()> {
70 let mut rat_arc = RAT
71 .lock()
72 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
73
74 if rat_arc.is_some() {
75 return Err(anyhow::anyhow!("Rat already initialized"));
76 }
77
78 let mut srt = RT.lock().unwrap();
79 if let Some(rt) = runtime {
80 srt.replace(rt);
81 }
82
83 if srt.is_none() {
84 srt.replace(Arc::new(
85 tokio::runtime::Builder::new_current_thread()
86 .enable_all()
87 .build()
88 .unwrap(),
89 ));
90 }
91
92 let rt = srt.as_ref().expect("just set").clone();
93 let new_rat = Rat::create(node_name, timeout, rt)?;
94 rat_arc.replace(new_rat);
95
96 Ok(())
97}
98
99pub fn deinit() -> anyhow::Result<()> {
100 let mut rat_arc = RAT
101 .lock()
102 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
103
104 if rat_arc.is_none() {
105 return Err(anyhow::anyhow!("Rat not initialized"));
106 }
107
108 rat_arc.take();
109 Ok(())
110}
111
112pub fn bacon<T>(
116 variable_name: &str,
117 data: &mut T,
118 variable_type: VariableType,
119) -> anyhow::Result<()>
120where
121 T: Archive,
122 T::Archived: for<'a> CheckBytes<HighValidator<'a, rkyv::rancor::Error>>
123 + Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
124 T: 'static + Send,
125 T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
126 T: Send + Sync,
127{
128 let rat_arc = RAT
129 .lock()
130 .map_err(|e| anyhow::anyhow!("Failed to lock rat: {}", e))?;
131
132 let rat = rat_arc
133 .as_ref()
134 .ok_or(anyhow::anyhow!("Rat not initialized"))?;
135
136 let srt = RT.lock().unwrap();
137 let rt = srt.as_ref().ok_or(anyhow!(
138 "Async Runtime not initialized. Call init() before calling bacon()."
139 ))?;
140
141 if let Some(rat_ship) = rat.ship.as_ref() {
142 rt.block_on(async move {
143 match rat_ship.ask_for_action(variable_name).await {
144 Ok((mt_sea::Action::Sail, lock_until_ack)) => {
145 info!("Rat {} sails for variable {}", rat.name, variable_name);
146 let receiver = lock_until_ack.then_some({
147 let client = rat_ship.client.lock().await;
148 let sender = client.coordinator_receive.read().unwrap();
149 sender
150 .as_ref()
151 .expect("How are we receiving anything in the client? :)")
152 .subscribe()
153 });
154
155 if let Some(mut receiver) = receiver {
156 info!("Locked...");
157 loop {
158 let (packet, _) = receiver.recv().await?;
159 if matches!(packet.data, net::PacketKind::Acknowledge) {
160 break;
161 }
162 }
163 info!("Unlocked");
164 }
165
166 Ok(())
167 }
168 Ok((mt_sea::Action::Shoot { target, id }, lock_until_ack)) => {
169 info!("Rat {} shoots {} at {:?}", rat.name, variable_name, target);
170
171 let receiver = lock_until_ack.then_some({
172 let client = rat_ship.client.lock().await;
173 let sender = client.coordinator_receive.read().unwrap();
174 sender
175 .as_ref()
176 .expect("How are we receiving anything in the client? :)")
177 .subscribe()
178 });
179
180 rat_ship
181 .get_cannon()
182 .shoot(&target, id, data, variable_type, variable_name)
183 .await?;
184
185 if let Some(mut receiver) = receiver {
186 info!("Locked...");
187 loop {
188 let (packet, _) = receiver.recv().await?;
189 if matches!(packet.data, net::PacketKind::Acknowledge) {
190 break;
191 }
192 }
193 info!("Unlocked");
194 }
195
196 info!(
197 "Rat {} finished shooting {} at {:?}",
198 rat.name, variable_name, target
199 );
200
201 Ok(())
202 }
203 Ok((mt_sea::Action::Catch { source, id }, lock_until_ack)) => {
204 info!(
205 "Rat {} catches {} from {:?}",
206 rat.name, variable_name, source
207 );
208
209 let receiver = lock_until_ack.then_some({
210 let client = rat_ship.client.lock().await;
211 let sender = client.coordinator_receive.read().unwrap();
212 sender
213 .as_ref()
214 .expect("How are we receiving anything in the client? :)")
215 .subscribe()
216 });
217
218 let mut recv_data = rat_ship.get_cannon().catch::<T>(id).await?;
219
220 info!(
221 "Rat {} finished catching {} from {:?}",
222 rat.name, variable_name, source
223 );
224
225 *data = recv_data.remove(0);
227
228 if let Some(mut receiver) = receiver {
229 info!("Locked...");
230 loop {
231 let (packet, _) = receiver.recv().await?;
232 if matches!(packet.data, net::PacketKind::Acknowledge) {
233 break;
234 }
235 }
236 info!("Unlocked");
237 }
238
239 Ok(())
240 }
241 Err(e) => {
242 error!("Failed to get action: {}", e);
243 Err(e)
244 }
245 }
246 })
247 } else {
248 Ok(())
249 }
250}
251
252#[cfg(all(target_arch = "aarch64", target_vendor = "apple"))]
254type CFfiString = i8;
255
256#[cfg(all(
257 not(target_arch = "x86"),
258 not(target_arch = "x86_64"),
259 not(target_vendor = "apple")
260))]
261type CFfiString = u8;
262
263#[cfg(any(
264 target_arch = "x86",
265 target_arch = "x86_64",
266 all(target_vendor = "apple", not(target_arch = "aarch64"))
267))]
268type CFfiString = i8;
269
270#[unsafe(no_mangle)]
271pub unsafe extern "C" fn rat_init(node_name: *const CFfiString, timeout_secs: i32) -> i32 {
274 let init = || {
275 let node_name = unsafe { std::ffi::CStr::from_ptr(node_name) };
276 let node_name = node_name.to_str().unwrap();
277
278 let timeout = if timeout_secs <= 0 {
279 None
280 } else {
281 Some(std::time::Duration::from_secs(timeout_secs as u64))
282 };
283
284 init(node_name, timeout, None)
285 };
286
287 #[cfg(panic = "unwind")]
288 {
289 let catch = std::panic::catch_unwind(init);
290
291 match catch {
292 Ok(Ok(_)) => 0,
293 Ok(Err(e)) => {
294 error!("Could not initialize Rat: {e}.");
295 -1
296 }
297 Err(_) => {
298 error!("Rust did panic unexpectedly.");
299 -2
300 }
301 }
302 }
303
304 #[cfg(not(panic = "unwind"))]
305 {
306 let d = init();
307 match d {
308 Ok(_) => 0,
309 Err(e) => {
310 error!("Could not initialize Rat: {e}.");
311 -1
312 }
313 }
314 }
315}
316
317#[unsafe(no_mangle)]
318pub unsafe extern "C" fn rat_deinit() -> i32 {
321 #[cfg(panic = "unwind")]
322 {
323 let catch = std::panic::catch_unwind(deinit);
324
325 match catch {
326 Ok(Ok(_)) => 0,
327 Ok(Err(e)) => {
328 error!("Could not deinitialize Rat: {e}.");
329 -1
330 }
331 Err(_) => {
332 error!("Rust did panic unexpectedly.");
333 -2
334 }
335 }
336 }
337
338 #[cfg(not(panic = "unwind"))]
339 {
340 let d = deinit();
341 match d {
342 Ok(_) => 0,
343 Err(e) => {
344 error!("Could not deinitialize Rat: {e}.");
345 -1
346 }
347 }
348 }
349}
350
351#[unsafe(no_mangle)]
352pub unsafe extern "C" fn rat_bacon_f32(
356 variable_name: *const CFfiString,
357 data: *mut f32,
358 rows: usize,
359 cols: usize,
360) -> i32 {
361 let f = || {
362 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
363 let variable_name = variable_name.to_str().unwrap();
364
365 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
366 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
367
368 let mut net_mat = NetArray::from(matrix);
369 bacon(variable_name, &mut net_mat, VariableType::F32).map(|_| {
370 let matrix: nalgebra::DMatrix<f32> = net_mat.into();
371 for c in 0..cols {
372 for r in 0..rows {
373 data[c * rows + r] = matrix[(r, c)];
374 }
375 }
376 })
377 };
378
379 #[cfg(panic = "unwind")]
380 {
381 let catch = std::panic::catch_unwind(f);
382
383 match catch {
384 Ok(Ok(_)) => 0,
385 Ok(Err(e)) => {
386 error!("Failed to bacon: {}", e);
387 -1
388 }
389 Err(_) => {
390 error!("Rust did panic unexpectedly.");
391 -2
392 }
393 }
394 }
395
396 #[cfg(not(panic = "unwind"))]
397 {
398 let d = f();
399 match d {
400 Ok(_) => 0,
401 Err(e) => {
402 error!("Could not deinitialize Rat: {e}.");
403 -1
404 }
405 }
406 }
407}
408
409#[unsafe(no_mangle)]
410pub unsafe extern "C" fn rat_bacon_f64(
414 variable_name: *const CFfiString,
415 data: *mut f64,
416 rows: usize,
417 cols: usize,
418) -> i32 {
419 let f = || {
420 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
421 let variable_name = variable_name.to_str().unwrap();
422
423 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
424 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
425
426 let mut net_mat = NetArray::from(matrix);
427 bacon(variable_name, &mut net_mat, VariableType::F64).map(|_| {
428 let matrix: nalgebra::DMatrix<f64> = net_mat.into();
429 for c in 0..cols {
430 for r in 0..rows {
431 data[c * rows + r] = matrix[(r, c)];
432 }
433 }
434 })
435 };
436
437 #[cfg(panic = "unwind")]
438 {
439 let catch = std::panic::catch_unwind(f);
440
441 match catch {
442 Ok(Ok(_)) => 0,
443 Ok(Err(e)) => {
444 error!("Failed to bacon: {}", e);
445 -1
446 }
447 Err(_) => {
448 error!("Rust did panic unexpectedly.");
449 -2
450 }
451 }
452 }
453 #[cfg(not(panic = "unwind"))]
454 {
455 let d = f();
456 match d {
457 Ok(_) => 0,
458 Err(e) => {
459 error!("Could not deinitialize Rat: {e}.");
460 -1
461 }
462 }
463 }
464}
465
466#[unsafe(no_mangle)]
467pub unsafe extern "C" fn rat_bacon_i32(
471 variable_name: *const CFfiString,
472 data: *mut i32,
473 rows: usize,
474 cols: usize,
475) -> i32 {
476 let f = || {
477 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
478 let variable_name = variable_name.to_str().unwrap();
479
480 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
481 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
482
483 let mut net_mat = NetArray::from(matrix);
484 bacon(variable_name, &mut net_mat, VariableType::I32).map(|_| {
485 let matrix: nalgebra::DMatrix<i32> = net_mat.into();
486 for c in 0..cols {
487 for r in 0..rows {
488 data[c * rows + r] = matrix[(r, c)];
489 }
490 }
491 })
492 };
493
494 #[cfg(panic = "unwind")]
495 {
496 let catch = std::panic::catch_unwind(f);
497
498 match catch {
499 Ok(Ok(_)) => 0,
500 Ok(Err(e)) => {
501 error!("Failed to bacon: {}", e);
502 -1
503 }
504 Err(_) => {
505 error!("Rust did panic unexpectedly.");
506 -2
507 }
508 }
509 }
510 #[cfg(not(panic = "unwind"))]
511 {
512 let d = f();
513 match d {
514 Ok(_) => 0,
515 Err(e) => {
516 error!("Could not deinitialize Rat: {e}.");
517 -1
518 }
519 }
520 }
521}
522
523#[unsafe(no_mangle)]
524pub unsafe extern "C" fn rat_bacon_u8(
528 variable_name: *const CFfiString,
529 data: *mut u8,
530 rows: usize,
531 cols: usize,
532) -> i32 {
533 let f = || {
534 let variable_name = unsafe { std::ffi::CStr::from_ptr(variable_name) };
535 let variable_name = variable_name.to_str().unwrap();
536
537 let data = unsafe { std::slice::from_raw_parts_mut(data, rows * cols) };
538 let matrix = nalgebra::DMatrix::from_column_slice(rows, cols, data);
539
540 let mut net_mat = NetArray::from(matrix);
541 bacon(variable_name, &mut net_mat, VariableType::U8).map(|_| {
542 let matrix: nalgebra::DMatrix<u8> = net_mat.into();
543 for c in 0..cols {
544 for r in 0..rows {
545 data[c * rows + r] = matrix[(r, c)];
546 }
547 }
548 })
549 };
550
551 #[cfg(panic = "unwind")]
552 {
553 let catch = std::panic::catch_unwind(f);
554
555 match catch {
556 Ok(Ok(_)) => 0,
557 Ok(Err(e)) => {
558 error!("Failed to bacon: {}", e);
559 -1
560 }
561 Err(_) => {
562 error!("Rust did panic unexpectedly.");
563 -2
564 }
565 }
566 }
567 #[cfg(not(panic = "unwind"))]
568 {
569 let d = f();
570 match d {
571 Ok(_) => 0,
572 Err(e) => {
573 error!("Could not deinitialize Rat: {e}.");
574 -1
575 }
576 }
577 }
578}