1#![allow(clippy::module_name_repetitions)]
4mod builder;
5mod errors;
6pub(crate) mod keys;
7mod ping;
8use super::timefail;
9pub use builder::Builder;
10pub use errors::LogErr;
11use fsipc::legacy::{PreparedPoint, Transaction};
12use modio_logger_db::{fxtime, Datastore};
13pub use ping::LoggerPing;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::{interface, object_server::SignalEmitter};
16
17#[derive(Clone)]
18pub struct Logger {
19 ds: Datastore,
20 timefail: timefail::Timefail,
21}
22
23impl Logger {
24 pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
25 if timefail.is_timefail() {
26 info!("Failing all pending change requests due to TIMEFAIL");
27 ds.transaction_fail_pending()
28 .await
29 .inspect_err(|e| error!("Failed to remove pending transactions: {:?}", e))?;
30 }
31 ds.fail_queued_transactions()
32 .await
33 .inspect_err(|e| error!("Failed to remove queued transactions: {:?}", e))?;
34 Ok(Self { ds, timefail })
35 }
36
37 #[instrument(skip(self))]
38 pub async fn periodic(&self) -> Result<(), LogErr> {
39 debug!("Doing periodic tasks, persistance, timefail");
40 if self.ds.should_persist().await? {
41 self.ds
42 .persist_data()
43 .await
44 .inspect_err(|e| error!("Failed to persist datastore: {:?}", e))?;
45 };
46 if self.timefail.is_adjust() {
47 if let Some(adjust) = self.timefail.get_adjust().await {
48 info!("Time jump has happened, adjusting data with {adjust}");
49 let count = self.ds.fix_timefail(adjust).await?;
50 if count > 0 {
51 info!("Adjusted timestamps of {count} metrics");
52 };
53 self.timefail
54 .remove_adjust()
55 .await
56 .inspect_err(|e| error!("Failed to remove timefail file. err={:?}", e))?;
57 }
58 }
59 let stats = self
60 .ds
61 .get_statistics()
62 .await
63 .inspect_err(|e| error!("Failed to gather DB statistics: {:?}", e))?;
64 info!("Current stats: {stats}");
65 Ok(())
66 }
67 #[cfg(test)]
68 pub const fn ds(&self) -> &Datastore {
69 &self.ds
70 }
71
72 #[must_use]
73 pub const fn builder() -> Builder {
74 Builder::new()
75 }
76}
77
78async fn get_mac() -> String {
79 use async_std::fs;
80 let wan = std::path::Path::new("/sys/class/net/wan/address");
81 let mut res = fs::read_to_string(&wan)
82 .await
83 .unwrap_or_else(|_| String::from("00:00:00:00:00:00"));
84 res.make_ascii_lowercase();
86 res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
88 res
89}
90
91#[allow(clippy::use_self)]
92#[interface(name = "se.modio.logger.fsipc")]
93impl Logger {
94 #[allow(clippy::unused_self)]
95 const fn ping(&self) -> &str {
96 "Ping? Pong"
97 }
98 #[allow(clippy::unused_self)]
99 fn valid_key(&mut self, key: &str) -> bool {
100 keys::valid_key(key).is_ok()
101 }
102
103 #[allow(clippy::unused_self)]
104 async fn get_boxid(&self) -> String {
105 get_mac().await
106 }
107
108 #[instrument(skip(self))]
109 async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
110 keys::valid_key(key)?;
111 let dat = self
112 .ds
113 .get_last_datapoint(key)
114 .await
115 .inspect_err(|e| debug!("Failed to retrieve last datapoint. err={:?}", e))?;
118 let val = fsipc::legacy::Measure::from(dat);
119 Ok((val,))
120 }
121
122 #[instrument(skip(self))]
123 async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
124 let result = self
125 .ds
126 .get_latest_logdata()
127 .await
128 .inspect_err(|e| error!("Failed to get latest logdata. err={:?}", e))?;
129 let res: Vec<fsipc::legacy::Measure> = result
130 .into_iter()
131 .map(fsipc::legacy::Measure::from)
132 .collect();
133 Ok(res)
134 }
135
136 #[zbus(signal)]
138 async fn store_signal(
139 ctxt: &SignalEmitter<'_>,
140 key: &str,
141 value: &str,
142 when: u64,
143 ) -> zbus::Result<()>;
144
145 #[instrument(skip_all)]
146 async fn store(
147 &self,
148 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
149 key: &str,
150 value: &str,
151 ) -> Result<(), LogErr> {
152 keys::valid_key(key)?;
153 let when = fxtime();
154 let timefail = self.timefail.is_timefail();
155 self.ds
156 .insert(key, value, when, timefail)
157 .await
158 .inspect_err(|e| error!("Failed to store single metric. err={:?}", e))?;
159
160 #[allow(
161 clippy::cast_precision_loss,
162 clippy::cast_sign_loss,
163 clippy::cast_possible_truncation
164 )]
165 let trunc_when = when as u64;
166
167 if !key.starts_with("modio.") {
168 Self::store_signal(&ctxt, key, value, trunc_when).await?;
169 };
170 Ok(())
171 }
172
173 #[instrument(skip_all)]
174 async fn store_with_time(
175 &mut self,
176 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
177 key: &str,
178 value: &str,
179 when: u64,
180 ) -> Result<(), LogErr> {
181 keys::valid_key(key)?;
182 let timefail = false;
185
186 #[allow(clippy::cast_precision_loss)]
188 let db_when = when as f64;
189
190 self.ds
191 .insert(key, value, db_when, timefail)
192 .await
193 .inspect_err(|e| error!("Failed to store single metric with timestamp. err={:?}", e))?;
194 if !key.starts_with("modio.") {
195 Self::store_signal(&ctxt, key, value, when).await?;
196 };
197 Ok(())
198 }
199 #[zbus(signal)]
201 async fn transaction_added(ctxt: &SignalEmitter<'_>, key: &str) -> zbus::Result<()>;
202
203 #[instrument(skip_all)]
204 async fn transaction_add(
205 &mut self,
206 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
207 key: &str,
208 expected: &str,
209 target: &str,
210 token: &str,
211 ) -> Result<(), LogErr> {
212 keys::valid_key(key)?;
213 keys::valid_token(token)?;
214 if self
215 .ds
216 .has_transaction(token)
217 .await
218 .inspect_err(|e| error!("Failed to check for transaction for key. err={:?}", e))?
219 {
220 warn!(
221 "Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
222 key, token
223 );
224 return Ok(());
225 }
226 self.ds
227 .transaction_add(key, expected, target, token)
228 .await
229 .inspect_err(|e| error!("Failed to add transaction for key. err={:?}", e))?;
230 Self::transaction_added(&ctxt, key).await?;
231 Ok(())
232 }
233
234 #[instrument(skip(self))]
235 async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
236 debug!("Retrieving transactions beginning with {}", prefix);
237 let res = self
238 .ds
239 .transaction_get(prefix)
240 .await
241 .inspect_err(|e| error!("Failed to get transaction for key. err={:?}", e))?;
242
243 let res: Vec<Transaction> = res.into_iter().map(Transaction::from).collect();
244 Ok(res)
245 }
246
247 #[instrument(skip(self))]
248 async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
249 debug!("Marking transaction: t_id={}, failed", t_id);
250 let timefail = self.timefail.is_timefail();
251
252 #[allow(clippy::cast_possible_wrap)]
254 let t_id = t_id as i64;
255 let count = self
256 .ds
257 .transaction_fail(t_id, timefail)
258 .await
259 .inspect_err(|e| error!("Failed to mark transaction as failed. err={:?}", e))?;
260 if count > 0 {
261 Ok(())
262 } else {
263 Err(LogErr::TransactionNotFound)
264 }
265 }
266
267 #[instrument(skip(self))]
268 async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
269 debug!("Marking transaction: t_id={}, passed ", t_id);
270 let timefail = self.timefail.is_timefail();
271 #[allow(clippy::cast_possible_wrap)]
273 let t_id = t_id as i64;
274 let count = self
275 .ds
276 .transaction_pass(t_id, timefail)
277 .await
278 .inspect_err(|e| error!("Failed to mark transaction as passed. err={:?}", e))?;
279 if count > 0 {
280 Ok(())
281 } else {
282 Err(LogErr::TransactionNotFound)
283 }
284 }
285
286 #[instrument(skip(self))]
287 async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
288 prepare_range_check(maximum)?;
289 let data = self
290 .ds
291 .get_batch(maximum)
292 .await
293 .inspect_err(|e| error!("Failed to get batch of datapoints. err={:?}", e))?;
294 let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
295 Ok(result)
296 }
297
298 #[instrument(skip(self))]
299 async fn prepare_modio_datapoints(
300 &mut self,
301 maximum: u32,
302 ) -> Result<Vec<PreparedPoint>, LogErr> {
303 prepare_range_check(maximum)?;
304 let data = self
305 .ds
306 .get_internal_batch(maximum)
307 .await
308 .inspect_err(|e| error!("Failed to get_internal_batch. err={:?}", e))?;
309 let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
310 Ok(result)
311 }
312
313 #[instrument(skip_all)]
314 async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
315 prepare_remove_check(&items)?;
316 self.ds
317 .drop_batch(&items)
318 .await
319 .inspect_err(|e| error!("Failed to delete batch of submitted items. err={:?}", e))?;
320 Ok(())
321 }
322}
323
324fn prepare_remove_check(items: &[i64]) -> Result<(), PreparedError> {
326 if items.is_empty() {
327 return Err(PreparedError::Empty);
328 }
329 if items.iter().any(|x| *x < 0_i64) {
330 return Err(PreparedError::InvalidIndex);
331 }
332 Ok(())
333}
334
335#[derive(thiserror::Error, Debug)]
336pub enum PreparedError {
337 #[error("Too few items")]
338 TooSmall,
339 #[error("Too many items")]
340 TooMany,
341 #[error("Empty item set")]
342 Empty,
343 #[error("Invalid index")]
344 InvalidIndex,
345}
346
347impl From<PreparedError> for LogErr {
349 fn from(e: PreparedError) -> Self {
350 Self::InvalidPrepared(e.to_string())
351 }
352}
353
354const fn prepare_range_check(num: u32) -> Result<(), PreparedError> {
359 match num {
360 0 => Err(PreparedError::TooSmall),
361 1..=250 => Ok(()),
362 _ => Err(PreparedError::TooMany),
363 }
364}
365
366#[cfg(test)]
367pub mod tests {
368 use super::*;
369
370 use crate::testing::{Tempbase, TestServer};
371 use fsipc::unixtime;
372 use std::error::Error;
373 use test_log::test;
374 use timeout_macro::timeouttest;
375
376 type TestResult = Result<(), Box<dyn Error>>;
377
378 use futures_util::{FutureExt, StreamExt};
379
380 #[test(timeouttest)]
381 async fn ping_pong_test() -> TestResult {
382 let server = TestServer::new(line!()).await?;
383 let proxy = server.proxy().await?;
384 let first = proxy.ping().await?;
385 let second = proxy.ping().await?;
386 assert_eq!(first, "Ping? Pong");
387 assert_eq!(second, "Ping? Pong");
388 Ok(())
389 }
390
391 #[test(timeouttest)]
392 async fn done_gives_error_test() -> TestResult {
393 let server = TestServer::new(line!()).await?;
394 let proxy = server.proxy().await?;
395 let res = proxy.done().await;
396 assert!(res.is_err());
397 Ok(())
398 }
399
400 #[test(timeouttest)]
401 async fn store_retrieve() -> TestResult {
402 let server = TestServer::new(line!()).await?;
403 let proxy = server.proxy().await?;
404 proxy.store("test.key", "abc123").await?;
405 let m = proxy.retrieve("test.key").await?;
406 assert_eq!(m.key, "test.key");
407 assert_eq!(m.value, "abc123");
408 Ok(())
409 }
410
411 #[test(timeouttest)]
412 async fn store_buffer() -> TestResult {
413 let server = TestServer::new(line!()).await?;
414 let proxy = server.proxy().await?;
415 proxy
416 .store_with_time("test.key", "abc123", 1_494_602_107)
417 .await?;
418 proxy
419 .store_with_time("test.key", "abc1234", 1_494_602_108)
420 .await?;
421 let m = proxy.retrieve("test.key").await?;
422 assert_eq!(m.key, "test.key");
423 assert_eq!(m.value, "abc1234");
424 assert_eq!(m.timestamp, 1_494_602_108);
425 Ok(())
426 }
427
428 #[test(timeouttest)]
429 async fn retrieve_all_test() -> TestResult {
430 let server = TestServer::new(line!()).await?;
431 let proxy = server.proxy().await?;
432 debug!("I have a party here");
433 proxy
434 .store_with_time("test.key", "abc123", 1_494_602_107)
435 .await?;
436 proxy
437 .store_with_time("test.key", "abc1234", 1_494_602_108)
438 .await?;
439 proxy.store("test.key2", "abcdefg").await?;
440
441 let all = proxy.retrieve_all().await?;
442 assert_eq!(all.len(), 2);
443 let m0 = all.first().expect("Should have value");
444 assert_eq!(m0.key, "test.key");
445 assert_eq!(m0.value, "abc1234");
446 assert_eq!(m0.timestamp, 1_494_602_108);
447 let m1 = all.get(1).expect("Should have value");
448 assert_eq!(m1.key, "test.key2");
449 assert_eq!(m1.value, "abcdefg");
450 Ok(())
451 }
452
453 #[test(timeouttest)]
454 async fn transaction_adding_test() -> TestResult {
455 let server = TestServer::new(line!()).await?;
456 let proxy = server.proxy().await?;
457 proxy
458 .transaction_add("test.test.one", "first", "second", "012")
459 .await?;
460 proxy
461 .transaction_add("dummy.test.one", "should not", "be present", "013")
462 .await?;
463 let transactions = proxy.transaction_get("test.test").await?;
464 assert_eq!(transactions.len(), 1);
465 let res = &transactions[0];
466 assert_eq!(res.key, "test.test.one");
467 assert_eq!(res.t_id, 1, "Transaction ID mismatch");
468 Ok(())
469 }
470
471 #[test(timeouttest)]
472 async fn transaction_dupe_adding_test() -> TestResult {
473 let server = TestServer::new(line!()).await?;
474 let proxy = server.proxy().await?;
475 proxy
476 .transaction_add("test.test.one", "first", "second", "1638290048")
477 .await?;
478 let res = proxy
479 .transaction_add("test.test.one", "first", "second", "1638290048")
480 .await;
481 res.expect("duplicated tokens should not cause error");
482 let transactions = proxy.transaction_get("test.test").await?;
483 assert_eq!(transactions.len(), 1);
484 Ok(())
485 }
486
487 #[test(timeouttest)]
488 async fn transaction_signal_test() -> TestResult {
489 let server = TestServer::new(line!()).await?;
490 let logger = server.proxy().await?;
491 let mut stream = logger.receive_transaction_added().await?;
492 logger
493 .transaction_add("test.test.transaction_signal", "first", "second", "012")
494 .await?;
495
496 let signal = stream.next().await.unwrap();
497 let payload = signal.args()?;
498 assert_eq!(payload.key, "test.test.transaction_signal");
499 Ok(())
500 }
501
502 #[test(timeouttest)]
503 async fn transaction_passing_test() -> TestResult {
504 let server = TestServer::new(line!()).await?;
505 let logger = server.proxy().await?;
506 logger
507 .transaction_add("test.test.one", "first", "second", "012")
508 .await?;
509 logger
510 .transaction_add("test.test.two", "uno", "dos", "0113")
511 .await?;
512 let trans = logger.transaction_get("test.test").await?;
513 logger.transaction_fail(trans[0].t_id).await?;
514 logger.transaction_pass(trans[1].t_id).await?;
515 logger
516 .transaction_add("test.test.three", "etta", "tvåa", "0114")
517 .await?;
518 let transactions = logger.transaction_get("test.test").await?;
519 assert_eq!(transactions.len(), 1);
520 let res = &transactions[0];
521 assert_eq!(res.key, "test.test.three");
522 assert_eq!(res.t_id, 3, "Transaction id mismatch");
523 Ok(())
524 }
525
526 #[test(timeouttest)]
527 async fn retrieving_data_test() -> TestResult {
528 let server = TestServer::new(line!()).await?;
529 let ipc = server.proxy().await?;
530 ipc.store("test.test.one", "first").await?;
531 ipc.store("test.test.one", "second").await?;
532 ipc.store("test.test.one", "third").await?;
533
534 ipc.store("test.test.two", "1").await?;
535 ipc.store("test.test.two", "2").await?;
536 ipc.store("test.test.two", "3").await?;
537
538 let res = ipc.retrieve_all().await?;
539 for measure in &res {
540 let data = ipc.retrieve(&measure.key).await?;
541 assert_eq!(data.key, measure.key);
542 }
543 Ok(())
544 }
545
546 #[test(timeouttest)]
547 async fn valid_key_test() -> TestResult {
548 let server = TestServer::new(line!()).await?;
549 let ipc = server.proxy().await?;
550 assert!(ipc.valid_key("modio.software.development").await?);
551 assert!(ipc.valid_key("abc").await?);
552 assert!(ipc.valid_key("a.b.c").await?);
553 assert!(ipc.valid_key("a_b.c").await?);
554 Ok(())
555 }
556
557 #[test(timeouttest)]
558 async fn invalid_key_test() -> TestResult {
559 let server = TestServer::new(line!()).await?;
560 let ipc = server.proxy().await?;
561 assert!(!ipc.valid_key("modio..invalid").await?);
562 assert!(!ipc.valid_key(".modio..invalid").await?);
563 assert!(!ipc.valid_key("modio.invalid.").await?);
564 assert!(!ipc.valid_key("modio. invalid").await?);
565 assert!(!ipc.valid_key("modio.in valid").await?);
566 assert!(!ipc.valid_key("modio.invalid ").await?);
567 assert!(!ipc.valid_key(" modio.invalid").await?);
568 Ok(())
569 }
570
571 #[test(timeouttest)]
572 async fn transaction_double() -> TestResult {
573 let server = TestServer::new(line!()).await?;
574 let ipc = server.proxy().await?;
575 let key = "test.test.one";
576 let first = "first";
577 let second = "second";
578 let guid4 = "02558af4-ed83-4441-bc56-4bdf3c57092f";
579 ipc.store(key, first).await?;
581
582 ipc.transaction_add(key, first, second, guid4).await?;
583 let transactions = ipc.transaction_get(key).await?;
584 let first_transaction = transactions
585 .first()
586 .expect("Should have at least one transaction");
587 let res = ipc.transaction_pass(first_transaction.t_id).await;
588 assert!(res.is_ok());
589 let res = ipc.transaction_pass(first_transaction.t_id).await;
590 assert!(res.is_err());
591 Ok(())
592 }
593
594 #[test(timeouttest)]
595 async fn transaction_tests() -> TestResult {
596 let server = TestServer::new(line!()).await?;
597 let ipc = server.proxy().await?;
598 let key = "test.test.one";
599 let first = "first";
600 let mut our_value = "first";
601 let second = "second";
602
603 ipc.store(key, our_value).await?;
604 let when = unixtime();
605
606 ipc.store_with_time(key, our_value, when).await?;
607
608 let guid1 = "04b56dcd-527a-4dd7-909c-a4111c035cbb";
609 ipc.transaction_add(key, first, second, guid1).await?;
610 let guid2 = "0000111001110222";
611 ipc.transaction_add(key, first, second, guid2).await?;
612 let guid3 = "ff505709-0119-46f4-87c8-f67f0a0d953b";
613 ipc.transaction_add(key, first, second, guid3).await?;
614
615 let transactions = ipc.transaction_get(key).await?;
616 for trn in &transactions {
619 if trn.key == key {
620 if our_value == trn.expected {
621 our_value = &trn.target;
622 ipc.transaction_pass(trn.t_id).await?;
623 } else {
624 ipc.transaction_fail(trn.t_id).await?;
625 }
626 }
627 ipc.store(key, our_value).await?;
628 }
629 let res = ipc.transaction_get(key).await?;
631 assert_eq!(res.len(), 0);
632
633 Ok(())
634 }
635
636 #[test(timeouttest)]
637 async fn no_modio_signals() -> TestResult {
638 let server = TestServer::new(line!()).await?;
639 let ipc = server.proxy().await?;
640 let mut stream = ipc.receive_store_signal().await?;
641 ipc.store("test.test.test", "value").await?;
643 let first = stream.next().await.unwrap();
644 let payload = first.args()?;
645 assert_eq!(payload.key, "test.test.test");
646
647 ipc.store("modio.test.test", "value").await?;
650 let second = stream.next().now_or_never();
651 assert!(second.is_none());
652 Ok(())
653 }
654
655 #[test(timeouttest)]
656 async fn submit_consume() -> TestResult {
657 let server = TestServer::new(line!()).await?;
658 let ipc = server.proxy().await?;
659 for x in 0..5 {
660 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
661 .await?;
662 }
663 let vals = ipc.prepare_datapoints(10).await?;
664 assert_eq!(vals.len(), 5);
665
666 let point = &vals[0];
667 assert_eq!(point.key, "test.foo");
668 assert_eq!(point.value, "0");
669
670 let more = ipc.prepare_datapoints(10).await?;
671 assert_eq!(more.len(), 5);
673
674 let last = &vals[4];
675 assert_eq!(last.key, "test.foo");
676 assert_eq!(last.value, "4");
677
678 let to_remove: Vec<_> = vals.iter().map(|m| m.id).collect();
679
680 ipc.remove_prepared(to_remove).await?;
681 let after = ipc.prepare_datapoints(30).await?;
682 assert!(after.is_empty());
683 Ok(())
684 }
685
686 #[test(timeouttest)]
687 async fn submit_modio_consume() -> TestResult {
688 let server = TestServer::new(line!()).await?;
689 let ipc = server.proxy().await?;
690 for x in 0..5 {
691 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
692 .await?;
693 ipc.store_with_time("modio.test.foo", &x.to_string(), unixtime() - 100)
694 .await?;
695 }
696
697 let vals = ipc.prepare_modio_datapoints(20).await?;
698 assert_eq!(vals.len(), 5);
699 for point in &vals {
700 assert_eq!(&point.key, "modio.test.foo");
701 }
702
703 let more = ipc.prepare_modio_datapoints(10).await?;
704 assert_eq!(more.len(), 5);
706
707 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
710 ipc.remove_prepared(to_remove).await?;
711
712 let modio_after = ipc.prepare_modio_datapoints(30).await?;
713 assert!(modio_after.is_empty());
714
715 let after = ipc.prepare_datapoints(30).await?;
716 assert!(!after.is_empty());
717 Ok(())
718 }
719
720 #[test(timeouttest)]
723 async fn test_get_batch_expiry() -> TestResult {
724 let server = TestServer::new(line!()).await?;
725 let ipc = server.proxy().await?;
726 for x in 0..25 {
727 ipc.store("test.foo", &x.to_string()).await?;
728 }
729 let vals = ipc.prepare_datapoints(10).await?;
730 assert_eq!(
731 vals.len(),
732 0,
733 "Data should not automatically be flushed to disk"
734 );
735
736 ipc.store_with_time("test.foo", "26", unixtime() - 200)
737 .await?;
738 let vals = ipc.prepare_datapoints(10).await?;
739 assert_eq!(
740 vals.len(),
741 10,
742 "Oldest data in buffer should cause a flush to disk"
743 );
744 Ok(())
745 }
746
747 #[test(timeouttest)]
750 async fn test_internal_get_batch_expiry() -> TestResult {
751 let server = TestServer::new(line!()).await?;
752 let ipc = server.proxy().await?;
753 for x in 0..25 {
754 ipc.store("modio.test.foo", &x.to_string()).await?;
755 ipc.store("test.foo", &x.to_string()).await?;
756 }
757 let vals = ipc.prepare_modio_datapoints(10).await?;
758 assert_eq!(
759 vals.len(),
760 0,
761 "Data should not automatically be flushed to disk"
762 );
763
764 ipc.store_with_time("modio.test.foo", "26", unixtime() - 200)
765 .await?;
766 let vals = ipc.prepare_modio_datapoints(10).await?;
767 assert_eq!(
768 vals.len(),
769 10,
770 "Oldest data in buffer should cause a flush to disk"
771 );
772 Ok(())
773 }
774
775 #[test(timeouttest)]
776 async fn test_get_batch() -> TestResult {
777 let server = TestServer::new(line!()).await?;
778 let ipc = server.proxy().await?;
779 for x in 0..25 {
780 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 200)
781 .await?;
782 }
783
784 let vals = ipc.prepare_datapoints(10).await?;
785 assert_eq!(vals.len(), 10);
786 let point = &vals[0];
787 assert_eq!(point.key, "test.foo");
788 assert_eq!(point.value, "0");
789
790 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
791 ipc.remove_prepared(to_remove).await?;
792
793 let vals = ipc.prepare_datapoints(30).await?;
795 assert_eq!(vals.len(), 15);
796 let point = &vals[0];
797 assert_eq!(point.key, "test.foo");
798 assert_eq!(point.value, "10");
799
800 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
801 ipc.remove_prepared(to_remove).await?;
802
803 let vals = ipc.prepare_datapoints(30).await?;
804 assert!(vals.is_empty());
805 Ok(())
806 }
807
808 #[test(timeouttest)]
809 async fn test_get_mac() -> TestResult {
813 let server = TestServer::new(line!()).await?;
814 let logger = server.proxy().await?;
815 let res = logger.get_boxid().await?;
816 assert_eq!(res, "000000000000");
817 Ok(())
818 }
819
820 #[test(timeouttest)]
821 async fn test_empty_transactions() -> TestResult {
822 let server = TestServer::new(line!()).await?;
823 let ipc = server.proxy().await?;
824 let transactions = ipc.transaction_get("test").await?;
825 assert_eq!(transactions.len(), 0);
826 let values = ipc.retrieve_all().await?;
827 assert_eq!(values.len(), 0);
828 Ok(())
829 }
830
831 #[test(timeouttest)]
832 async fn test_transaction_get_prefix() -> TestResult {
833 let server = TestServer::new(line!()).await?;
834 let ipc = server.proxy().await?;
835 let transactions = ipc.transaction_get("mbus.").await?;
836 assert_eq!(transactions.len(), 0);
837 Ok(())
838 }
839
840 #[test(timeouttest)]
845 async fn resume_database_test() -> TestResult {
846 let base = Tempbase::default();
847 let dbfile = Tempbase::dbfile();
848 let name = format!("se.modio.logger.TestResumeDb{}", line!());
849
850 let expected = vec![
851 ("test.test.one", "1"),
852 ("test.test.two", "2"),
853 ("test.test.three", "3"),
854 ("test.test.four", "4"),
855 ("test.test.five", "5"),
856 ("test.test.six", "6"),
857 ];
858
859 {
861 let name = name.clone();
862 let base = base.clone();
863 let dbfile = dbfile.clone();
864 let server = TestServer::new_with_base(name, base, dbfile).await?;
865 let logger = server.proxy().await?;
866 info!("Filling datastore");
867 for (key, val) in &expected {
868 logger.store(key, val).await?;
869 }
870 for (key, value) in &expected {
872 let data = logger.retrieve(key).await?;
873 assert_eq!(&data.value, value);
874 assert_eq!(&data.key, key);
875 }
876 info!("Done, settling things");
877 };
878
879 {
881 let server = TestServer::new_with_base(name, base, dbfile).await?;
882 let logger = server.proxy().await?;
883 info!("Checking content after re-start");
884 for (key, value) in &expected {
885 let data = logger.retrieve(key).await?;
886 assert_eq!(&data.value, value);
887 assert_eq!(&data.key, key);
888 }
889 }
890 Ok(())
891 }
892
893 #[allow(clippy::assertions_on_constants)]
898 #[test(timeouttest)]
899 async fn load_and_maintain() -> TestResult {
900 let server = TestServer::new(line!()).await?;
901 let ipc = server.proxy().await?;
902 let logger1 = server.logger1().await?;
903 let submit1 = server.submit1().await?;
904 use crate::testing::proxytest::{Logger1Proxy, Submit1Proxy};
905 use crate::LOGGER_PATH;
906 use async_std::task::{sleep, spawn};
907 use fsipc::legacy::fsipcProxy;
908 use futures::try_join;
909 use std::collections::HashMap;
910 use std::sync::atomic::{AtomicBool, Ordering};
911 use std::sync::Arc;
912 use std::time::Duration;
913
914 let stop = Arc::new(AtomicBool::new(false));
917 const NUM_SLOW_KEYS: usize = 1000;
919 const NUM_TRANSACTIONS: usize = 50;
920 assert!(
921 NUM_TRANSACTIONS < NUM_SLOW_KEYS,
922 "Transactions work on slow keys."
923 );
924 const NUM_BULK_KEYS: usize = 2000;
925 const NUM_SUBMIT_DATA: usize = 200;
927
928 const NUM_CLEAN_LOOPS: usize = 50;
931 const NUM_MAINT_LOOPS: usize = 50;
932
933 async fn trickle_gen(
936 stop: Arc<AtomicBool>,
937 ipc: fsipcProxy<'static>,
938 logger1: Logger1Proxy<'static>,
939 ) -> zbus::Result<()> {
940 loop {
941 for n in 0..NUM_SLOW_KEYS {
942 let key = format!("test.trickle.key.{n}");
943 let name = format!("Name of tricke test key #{n}");
944 let desc = format!("Description {n} of trickle test key #{n}");
945 let val = format!("{n}");
946 logger1.set_metadata_name(&key, &name).await?;
947 logger1.set_metadata_description(&key, &desc).await?;
948 ipc.store(&key, &val).await?;
949 }
950 if stop.load(Ordering::Relaxed) {
951 break Ok(());
952 }
953 }
954 }
955 async fn bulk_gen(
957 stop: Arc<AtomicBool>,
958 logger1: Logger1Proxy<'static>,
959 ) -> zbus::Result<()> {
960 let delay = Duration::from_millis(1);
961 loop {
962 let mut map = HashMap::with_capacity(NUM_BULK_KEYS);
963 for n in 0..=NUM_BULK_KEYS {
964 let key = format!("test.bulk.key.{n}");
965 let val = format!("{n}");
966 map.insert(key, val.into());
967 }
968 logger1.store_batch(map).await?;
969 sleep(delay).await;
970 if stop.load(Ordering::Relaxed) {
971 break Ok(());
972 }
973 }
974 }
975
976 async fn submit_gen(
978 stop: Arc<AtomicBool>,
979 ipc: fsipcProxy<'static>,
980 submit1: Submit1Proxy<'static>,
981 ) -> zbus::Result<()> {
982 let delay = Duration::from_millis(5);
983 loop {
984 if stop.load(Ordering::Relaxed) {
985 break Ok(());
986 }
987 let _meta = submit1.get_all_metadata().await?;
988 let dp = ipc.prepare_datapoints(NUM_SUBMIT_DATA as u32).await?;
989 let to_remove: Vec<_> = dp.iter().map(|m| m.id).collect();
990 if to_remove.is_empty() {
991 sleep(delay).await;
992 } else {
993 ipc.remove_prepared(to_remove).await?;
994 }
995 }
996 }
997
998 async fn tran_gen(stop: Arc<AtomicBool>, ipc: fsipcProxy<'static>) -> zbus::Result<()> {
1000 let delay = Duration::from_millis(5);
1001 let mut t_id = 0;
1002 loop {
1003 sleep(delay).await;
1006 if stop.load(Ordering::Relaxed) {
1007 break Ok(());
1008 }
1009 for n in 0..=NUM_TRANSACTIONS {
1010 t_id += 1;
1011 let transaction_id = format!("{t_id}");
1012 let key = format!("test.trickle.key.{n}");
1013 if let Ok(val) = ipc.retrieve(&key).await {
1015 ipc.transaction_add(&val.key, &val.value, "0", &transaction_id)
1016 .await?;
1017 }
1018 }
1019 }
1020 }
1021
1022 async fn maint_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1023 let delay = Duration::from_millis(5);
1024 use crate::LOGGER_PATH;
1025 let iface_ref = conn
1026 .object_server()
1027 .interface::<_, Logger>(LOGGER_PATH)
1028 .await?;
1029 for _ in 0..=NUM_MAINT_LOOPS {
1030 sleep(delay).await;
1031 if stop.load(Ordering::Relaxed) {
1032 return Ok(());
1033 }
1034 iface_ref
1035 .get_mut()
1036 .await
1037 .periodic()
1038 .await
1039 .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1043 }
1044 stop.store(true, Ordering::Relaxed);
1045 Ok(())
1046 }
1047
1048 async fn clean_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1049 let delay = Duration::from_millis(5);
1050 use crate::LOGGER_PATH;
1051 let iface_ref = conn
1052 .object_server()
1053 .interface::<_, Logger>(LOGGER_PATH)
1054 .await?;
1055 let pool = iface_ref.get_mut().await.ds().pool();
1056 for _ in 0..=NUM_CLEAN_LOOPS {
1057 sleep(delay).await;
1058 if stop.load(Ordering::Relaxed) {
1059 return Ok(());
1060 }
1061 Datastore::clean_maintenance(pool.clone())
1062 .await
1063 .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1064 }
1065 stop.store(true, Ordering::Relaxed);
1066 Ok(())
1067 }
1068
1069 let trickle = spawn(trickle_gen(stop.clone(), ipc.clone(), logger1.clone()));
1070 let bulk = spawn(bulk_gen(stop.clone(), logger1.clone()));
1071 let submit = spawn(submit_gen(stop.clone(), ipc.clone(), submit1.clone()));
1072 let trans = spawn(tran_gen(stop.clone(), ipc.clone()));
1073 let maint = spawn(maint_task(stop.clone(), server.conn.clone()));
1074 let cleant = spawn(clean_task(stop.clone(), server.conn.clone()));
1075 let iface_ref = server
1076 .conn
1077 .object_server()
1078 .interface::<_, Logger>(LOGGER_PATH)
1079 .await?;
1080 let _conn4 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1083 let _conn5 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1084 try_join!(trickle, bulk, submit, trans, maint, cleant)?;
1085 stop.store(true, Ordering::Relaxed);
1086
1087 Ok(())
1088 }
1089}