1#![allow(clippy::module_name_repetitions)]
4mod builder;
5mod errors;
6pub(crate) mod keys;
7mod ping;
8use super::timefail;
9pub use builder::Builder;
10pub use errors::LogErr;
11use fsipc::legacy::{PreparedPoint, Transaction};
12use modio_logger_db::{fxtime, Datastore};
13pub use ping::LoggerPing;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::{interface, SignalContext};
16
17#[derive(Clone)]
18pub struct Logger {
19 ds: Datastore,
20 timefail: timefail::Timefail,
21}
22
23impl Logger {
24 pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
25 if timefail.is_timefail() {
26 info!("Failing all pending change requests due to TIMEFAIL");
27 ds.transaction_fail_pending()
28 .await
29 .inspect_err(|e| error!("Failed to remove pending transactions: {:?}", e))?;
30 }
31 ds.fail_queued_transactions()
32 .await
33 .inspect_err(|e| error!("Failed to remove queued transactions: {:?}", e))?;
34 Ok(Self { ds, timefail })
35 }
36
37 #[instrument(skip(self))]
38 pub async fn periodic(&self) -> Result<(), LogErr> {
39 debug!("Doing periodic tasks, persistance, timefail");
40 if self.ds.should_persist().await? {
41 self.ds
42 .persist_data()
43 .await
44 .inspect_err(|e| error!("Failed to persist datastore: {:?}", e))?;
45 };
46 if self.timefail.is_adjust() {
47 if let Some(adjust) = self.timefail.get_adjust().await {
48 info!("Time jump has happened, adjusting data with {adjust}");
49 let count = self.ds.fix_timefail(adjust).await?;
50 if count > 0 {
51 info!("Adjusted timestamps of {count} metrics");
52 };
53 self.timefail
54 .remove_adjust()
55 .await
56 .inspect_err(|e| error!("Failed to remove timefail file. err={:?}", e))?;
57 }
58 }
59 let stats = self
60 .ds
61 .get_statistics()
62 .await
63 .inspect_err(|e| error!("Failed to gather DB statistics: {:?}", e))?;
64 info!("Current stats: {stats}");
65 Ok(())
66 }
67 #[cfg(test)]
68 pub const fn ds(&self) -> &Datastore {
69 &self.ds
70 }
71
72 #[must_use]
73 pub const fn builder() -> Builder {
74 Builder::new()
75 }
76}
77
78async fn get_mac() -> String {
79 use async_std::fs;
80 let wan = std::path::Path::new("/sys/class/net/wan/address");
81 let mut res = fs::read_to_string(&wan)
82 .await
83 .unwrap_or_else(|_| String::from("00:00:00:00:00:00"));
84 res.make_ascii_lowercase();
86 res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
88 res
89}
90
91#[allow(clippy::use_self)]
92#[interface(name = "se.modio.logger.fsipc")]
93impl Logger {
94 #[allow(clippy::unused_self)]
95 const fn ping(&self) -> &str {
96 "Ping? Pong"
97 }
98 #[allow(clippy::unused_self)]
99 fn valid_key(&mut self, key: &str) -> bool {
100 keys::valid_key(key).is_ok()
101 }
102
103 #[allow(clippy::unused_self)]
104 async fn get_boxid(&self) -> String {
105 get_mac().await
106 }
107
108 #[instrument(skip(self))]
109 async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
110 keys::valid_key(key)?;
111 let dat = self
112 .ds
113 .get_last_datapoint(key)
114 .await
115 .inspect_err(|e| debug!("Failed to retrieve last datapoint. err={:?}", e))?;
118 let val = fsipc::legacy::Measure::from(dat);
119 Ok((val,))
120 }
121
122 #[instrument(skip(self))]
123 async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
124 let result = self
125 .ds
126 .get_latest_logdata()
127 .await
128 .inspect_err(|e| error!("Failed to get latest logdata. err={:?}", e))?;
129 let res: Vec<fsipc::legacy::Measure> = result
130 .into_iter()
131 .map(fsipc::legacy::Measure::from)
132 .collect();
133 Ok(res)
134 }
135
136 #[zbus(signal)]
138 async fn store_signal(
139 ctxt: &SignalContext<'_>,
140 key: &str,
141 value: &str,
142 when: u64,
143 ) -> zbus::Result<()>;
144
145 #[instrument(skip_all)]
146 async fn store(
147 &self,
148 #[zbus(signal_context)] ctxt: SignalContext<'_>,
149 key: &str,
150 value: &str,
151 ) -> Result<(), LogErr> {
152 keys::valid_key(key)?;
153 let when = fxtime();
154 let timefail = self.timefail.is_timefail();
155 self.ds
156 .insert(key, value, when, timefail)
157 .await
158 .inspect_err(|e| error!("Failed to store single metric. err={:?}", e))?;
159
160 #[allow(
161 clippy::cast_precision_loss,
162 clippy::cast_sign_loss,
163 clippy::cast_possible_truncation
164 )]
165 let trunc_when = when as u64;
166
167 if !key.starts_with("modio.") {
168 Self::store_signal(&ctxt, key, value, trunc_when).await?;
169 };
170 Ok(())
171 }
172
173 #[instrument(skip_all)]
174 async fn store_with_time(
175 &mut self,
176 #[zbus(signal_context)] ctxt: SignalContext<'_>,
177 key: &str,
178 value: &str,
179 when: u64,
180 ) -> Result<(), LogErr> {
181 keys::valid_key(key)?;
182 let timefail = false;
185
186 #[allow(clippy::cast_precision_loss)]
188 let db_when = when as f64;
189
190 self.ds
191 .insert(key, value, db_when, timefail)
192 .await
193 .inspect_err(|e| error!("Failed to store single metric with timestamp. err={:?}", e))?;
194 if !key.starts_with("modio.") {
195 Self::store_signal(&ctxt, key, value, when).await?;
196 };
197 Ok(())
198 }
199 #[zbus(signal)]
201 async fn transaction_added(ctxt: &SignalContext<'_>, key: &str) -> zbus::Result<()>;
202
203 #[instrument(skip_all)]
204 async fn transaction_add(
205 &mut self,
206 #[zbus(signal_context)] ctxt: SignalContext<'_>,
207 key: &str,
208 expected: &str,
209 target: &str,
210 token: &str,
211 ) -> Result<(), LogErr> {
212 keys::valid_key(key)?;
213 keys::valid_token(token)?;
214 if self
215 .ds
216 .has_transaction(token)
217 .await
218 .inspect_err(|e| error!("Failed to check for transaction for key. err={:?}", e))?
219 {
220 warn!(
221 "Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
222 key, token
223 );
224 return Ok(());
225 }
226 self.ds
227 .transaction_add(key, expected, target, token)
228 .await
229 .inspect_err(|e| error!("Failed to add transaction for key. err={:?}", e))?;
230 Self::transaction_added(&ctxt, key).await?;
231 Ok(())
232 }
233
234 #[instrument(skip(self))]
235 async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
236 debug!("Retrieving transactions beginning with {}", prefix);
237 let res = self
238 .ds
239 .transaction_get(prefix)
240 .await
241 .inspect_err(|e| error!("Failed to get transaction for key. err={:?}", e))?;
242
243 let res: Vec<Transaction> = res.into_iter().map(Transaction::from).collect();
244 Ok(res)
245 }
246
247 #[instrument(skip(self))]
248 async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
249 debug!("Marking transaction: t_id={}, failed", t_id);
250 let timefail = self.timefail.is_timefail();
251
252 #[allow(clippy::cast_possible_wrap)]
254 let t_id = t_id as i64;
255 let count = self
256 .ds
257 .transaction_fail(t_id, timefail)
258 .await
259 .inspect_err(|e| error!("Failed to mark transaction as failed. err={:?}", e))?;
260 if count > 0 {
261 Ok(())
262 } else {
263 Err(LogErr::TransactionNotFound)
264 }
265 }
266
267 #[instrument(skip(self))]
268 async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
269 debug!("Marking transaction: t_id={}, passed ", t_id);
270 let timefail = self.timefail.is_timefail();
271 #[allow(clippy::cast_possible_wrap)]
273 let t_id = t_id as i64;
274 let count = self
275 .ds
276 .transaction_pass(t_id, timefail)
277 .await
278 .inspect_err(|e| error!("Failed to mark transaction as passed. err={:?}", e))?;
279 if count > 0 {
280 Ok(())
281 } else {
282 Err(LogErr::TransactionNotFound)
283 }
284 }
285
286 #[instrument(skip(self))]
287 async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
288 prepare_range_check(maximum)?;
289 let data = self
290 .ds
291 .get_batch(maximum)
292 .await
293 .inspect_err(|e| error!("Failed to get batch of datapoints. err={:?}", e))?;
294 let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
295 Ok(result)
296 }
297
298 #[instrument(skip(self))]
299 async fn prepare_modio_datapoints(
300 &mut self,
301 maximum: u32,
302 ) -> Result<Vec<PreparedPoint>, LogErr> {
303 prepare_range_check(maximum)?;
304 let data = self
305 .ds
306 .get_internal_batch(maximum)
307 .await
308 .inspect_err(|e| error!("Failed to get_internal_batch. err={:?}", e))?;
309 let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
310 Ok(result)
311 }
312
313 #[instrument(skip_all)]
314 async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
315 prepare_remove_check(&items)?;
316 self.ds
317 .drop_batch(&items)
318 .await
319 .inspect_err(|e| error!("Failed to delete batch of submitted items. err={:?}", e))?;
320 Ok(())
321 }
322}
323
324fn prepare_remove_check(items: &[i64]) -> Result<(), PreparedError> {
326 if items.is_empty() {
327 return Err(PreparedError::Empty);
328 }
329 if items.iter().any(|x| *x < 0_i64) {
330 return Err(PreparedError::InvalidIndex);
331 }
332 Ok(())
333}
334
335#[derive(thiserror::Error, Debug)]
336pub enum PreparedError {
337 #[error("Too few items")]
338 TooSmall,
339 #[error("Too many items")]
340 TooMany,
341 #[error("Empty item set")]
342 Empty,
343 #[error("Invalid index")]
344 InvalidIndex,
345}
346
347impl From<PreparedError> for LogErr {
349 fn from(e: PreparedError) -> Self {
350 Self::InvalidPrepared(e.to_string())
351 }
352}
353
354const fn prepare_range_check(num: u32) -> Result<(), PreparedError> {
359 match num {
360 0 => Err(PreparedError::TooSmall),
361 1..=250 => Ok(()),
362 _ => Err(PreparedError::TooMany),
363 }
364}
365
366#[cfg(test)]
367pub mod tests {
368 use super::*;
369
370 use crate::testing::{Tempbase, TestServer};
371 use fsipc::unixtime;
372 use std::error::Error;
373 use test_log::test;
374 use timeout_macro::timeouttest;
375
376 type TestResult = Result<(), Box<dyn Error>>;
377
378 use futures_util::{FutureExt, StreamExt};
379
380 #[test(timeouttest)]
381 async fn ping_pong_test() -> TestResult {
382 let server = TestServer::new(line!()).await?;
383 let proxy = server.proxy().await?;
384 let first = proxy.ping().await?;
385 let second = proxy.ping().await?;
386 assert_eq!(first, "Ping? Pong");
387 assert_eq!(second, "Ping? Pong");
388 Ok(())
389 }
390
391 #[test(timeouttest)]
392 async fn done_gives_error_test() -> TestResult {
393 let server = TestServer::new(line!()).await?;
394 let proxy = server.proxy().await?;
395 let res = proxy.done().await;
396 assert!(res.is_err());
397 Ok(())
398 }
399
400 #[test(timeouttest)]
401 async fn store_retrieve() -> TestResult {
402 let server = TestServer::new(line!()).await?;
403 let proxy = server.proxy().await?;
404 proxy.store("test.key", "abc123").await?;
405 let m = proxy.retrieve("test.key").await?;
406 assert_eq!(m.key, "test.key");
407 assert_eq!(m.value, "abc123");
408 Ok(())
409 }
410
411 #[test(timeouttest)]
412 async fn store_buffer() -> TestResult {
413 let server = TestServer::new(line!()).await?;
414 let proxy = server.proxy().await?;
415 proxy
416 .store_with_time("test.key", "abc123", 1_494_602_107)
417 .await?;
418 proxy
419 .store_with_time("test.key", "abc1234", 1_494_602_108)
420 .await?;
421 let m = proxy.retrieve("test.key").await?;
422 assert_eq!(m.key, "test.key");
423 assert_eq!(m.value, "abc1234");
424 assert_eq!(m.timestamp, 1_494_602_108);
425 Ok(())
426 }
427
428 #[test(timeouttest)]
429 async fn retrieve_all_test() -> TestResult {
430 let server = TestServer::new(line!()).await?;
431 let proxy = server.proxy().await?;
432 debug!("I have a party here");
433 proxy
434 .store_with_time("test.key", "abc123", 1_494_602_107)
435 .await?;
436 proxy
437 .store_with_time("test.key", "abc1234", 1_494_602_108)
438 .await?;
439 proxy.store("test.key2", "abcdefg").await?;
440
441 let all = proxy.retrieve_all().await?;
442 assert_eq!(all.len(), 2);
443 let m0 = all.first().expect("Should have value");
444 assert_eq!(m0.key, "test.key");
445 assert_eq!(m0.value, "abc1234");
446 assert_eq!(m0.timestamp, 1_494_602_108);
447 let m1 = all.get(1).expect("Should have value");
448 assert_eq!(m1.key, "test.key2");
449 assert_eq!(m1.value, "abcdefg");
450 Ok(())
451 }
452
453 #[test(timeouttest)]
454 async fn transaction_adding_test() -> TestResult {
455 let server = TestServer::new(line!()).await?;
456 let proxy = server.proxy().await?;
457 proxy
458 .transaction_add("test.test.one", "first", "second", "012")
459 .await?;
460 proxy
461 .transaction_add("dummy.test.one", "should not", "be present", "013")
462 .await?;
463 let transactions = proxy.transaction_get("test.test").await?;
464 assert_eq!(transactions.len(), 1);
465 let res = &transactions[0];
466 assert_eq!(res.key, "test.test.one");
467 assert_eq!(res.t_id, 1, "Transaction ID mismatch");
468 Ok(())
469 }
470
471 #[test(timeouttest)]
472 async fn transaction_dupe_adding_test() -> TestResult {
473 let server = TestServer::new(line!()).await?;
474 let proxy = server.proxy().await?;
475 proxy
476 .transaction_add("test.test.one", "first", "second", "1638290048")
477 .await?;
478 let res = proxy
479 .transaction_add("test.test.one", "first", "second", "1638290048")
480 .await;
481 res.expect("duplicated tokens should not cause error");
482 let transactions = proxy.transaction_get("test.test").await?;
483 assert_eq!(transactions.len(), 1);
484 Ok(())
485 }
486
487 #[test(timeouttest)]
488 async fn transaction_signal_test() -> TestResult {
489 let server = TestServer::new(line!()).await?;
490 let logger = server.proxy().await?;
491 let mut stream = logger.receive_transaction_added().await?;
492 logger
493 .transaction_add("test.test.transaction_signal", "first", "second", "012")
494 .await?;
495
496 let signal = stream.next().await.unwrap();
497 let payload = signal.args()?;
498 assert_eq!(payload.key, "test.test.transaction_signal");
499 Ok(())
500 }
501
502 #[test(timeouttest)]
503 async fn transaction_passing_test() -> TestResult {
504 let server = TestServer::new(line!()).await?;
505 let logger = server.proxy().await?;
506 logger
507 .transaction_add("test.test.one", "first", "second", "012")
508 .await?;
509 logger
510 .transaction_add("test.test.two", "uno", "dos", "0113")
511 .await?;
512 let trans = logger.transaction_get("test.test").await?;
513 logger.transaction_fail(trans[0].t_id).await?;
514 logger.transaction_pass(trans[1].t_id).await?;
515 logger
516 .transaction_add("test.test.three", "etta", "tvåa", "0114")
517 .await?;
518 let transactions = logger.transaction_get("test.test").await?;
519 assert_eq!(transactions.len(), 1);
520 let res = &transactions[0];
521 assert_eq!(res.key, "test.test.three");
522 assert_eq!(res.t_id, 3, "Transaction id mismatch");
523 Ok(())
524 }
525
526 #[test(timeouttest)]
527 async fn retrieving_data_test() -> TestResult {
528 let server = TestServer::new(line!()).await?;
529 let ipc = server.proxy().await?;
530 ipc.store("test.test.one", "first").await?;
531 ipc.store("test.test.one", "second").await?;
532 ipc.store("test.test.one", "third").await?;
533
534 ipc.store("test.test.two", "1").await?;
535 ipc.store("test.test.two", "2").await?;
536 ipc.store("test.test.two", "3").await?;
537
538 let res = ipc.retrieve_all().await?;
539 for measure in &res {
540 let data = ipc.retrieve(&measure.key).await?;
541 assert_eq!(data.key, measure.key);
542 }
543 Ok(())
544 }
545
546 #[test(timeouttest)]
547 async fn valid_key_test() -> TestResult {
548 let server = TestServer::new(line!()).await?;
549 let ipc = server.proxy().await?;
550 assert!(ipc.valid_key("modio.software.development").await?);
551 assert!(ipc.valid_key("abc").await?);
552 assert!(ipc.valid_key("a.b.c").await?);
553 assert!(ipc.valid_key("a_b.c").await?);
554 Ok(())
555 }
556
557 #[test(timeouttest)]
558 async fn invalid_key_test() -> TestResult {
559 let server = TestServer::new(line!()).await?;
560 let ipc = server.proxy().await?;
561 assert!(!ipc.valid_key("modio..invalid").await?);
562 assert!(!ipc.valid_key(".modio..invalid").await?);
563 assert!(!ipc.valid_key("modio.invalid.").await?);
564 assert!(!ipc.valid_key("modio. invalid").await?);
565 assert!(!ipc.valid_key("modio.in valid").await?);
566 assert!(!ipc.valid_key("modio.invalid ").await?);
567 assert!(!ipc.valid_key(" modio.invalid").await?);
568 Ok(())
569 }
570
571 #[test(timeouttest)]
572 async fn transaction_double() -> TestResult {
573 let server = TestServer::new(line!()).await?;
574 let ipc = server.proxy().await?;
575 let key = "test.test.one";
576 let first = "first";
577 let second = "second";
578 let guid4 = zbus::Guid::generate();
579 ipc.store(key, first).await?;
581
582 ipc.transaction_add(key, first, second, guid4.as_str())
583 .await?;
584 let transactions = ipc.transaction_get(key).await?;
585 let first_transaction = transactions
586 .first()
587 .expect("Should have at least one transaction");
588 let res = ipc.transaction_pass(first_transaction.t_id).await;
589 assert!(res.is_ok());
590 let res = ipc.transaction_pass(first_transaction.t_id).await;
591 assert!(res.is_err());
592 Ok(())
593 }
594
595 #[test(timeouttest)]
596 async fn transaction_tests() -> TestResult {
597 let server = TestServer::new(line!()).await?;
598 let ipc = server.proxy().await?;
599 let key = "test.test.one";
600 let first = "first";
601 let mut our_value = "first";
602 let second = "second";
603
604 ipc.store(key, our_value).await?;
605 let when = unixtime();
606
607 ipc.store_with_time(key, our_value, when).await?;
608
609 let guid1 = zbus::Guid::generate(); ipc.transaction_add(key, first, second, guid1.as_str())
611 .await?;
612 let guid2 = zbus::Guid::generate();
613 ipc.transaction_add(key, first, second, guid2.as_str())
614 .await?;
615 let guid3 = zbus::Guid::generate();
616 ipc.transaction_add(key, first, second, guid3.as_str())
617 .await?;
618
619 let transactions = ipc.transaction_get(key).await?;
620 for trn in &transactions {
623 if trn.key == key {
624 if our_value == trn.expected {
625 our_value = &trn.target;
626 ipc.transaction_pass(trn.t_id).await?;
627 } else {
628 ipc.transaction_fail(trn.t_id).await?;
629 }
630 }
631 ipc.store(key, our_value).await?;
632 }
633 let res = ipc.transaction_get(key).await?;
635 assert_eq!(res.len(), 0);
636
637 Ok(())
638 }
639
640 #[test(timeouttest)]
641 async fn no_modio_signals() -> TestResult {
642 let server = TestServer::new(line!()).await?;
643 let ipc = server.proxy().await?;
644 let mut stream = ipc.receive_store_signal().await?;
645 ipc.store("test.test.test", "value").await?;
647 let first = stream.next().await.unwrap();
648 let payload = first.args()?;
649 assert_eq!(payload.key, "test.test.test");
650
651 ipc.store("modio.test.test", "value").await?;
654 let second = stream.next().now_or_never();
655 assert!(second.is_none());
656 Ok(())
657 }
658
659 #[test(timeouttest)]
660 async fn submit_consume() -> TestResult {
661 let server = TestServer::new(line!()).await?;
662 let ipc = server.proxy().await?;
663 for x in 0..5 {
664 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
665 .await?;
666 }
667 let vals = ipc.prepare_datapoints(10).await?;
668 assert_eq!(vals.len(), 5);
669
670 let point = &vals[0];
671 assert_eq!(point.key, "test.foo");
672 assert_eq!(point.value, "0");
673
674 let more = ipc.prepare_datapoints(10).await?;
675 assert_eq!(more.len(), 5);
677
678 let last = &vals[4];
679 assert_eq!(last.key, "test.foo");
680 assert_eq!(last.value, "4");
681
682 let to_remove: Vec<_> = vals.iter().map(|m| m.id).collect();
683
684 ipc.remove_prepared(to_remove).await?;
685 let after = ipc.prepare_datapoints(30).await?;
686 assert!(after.is_empty());
687 Ok(())
688 }
689
690 #[test(timeouttest)]
691 async fn submit_modio_consume() -> TestResult {
692 let server = TestServer::new(line!()).await?;
693 let ipc = server.proxy().await?;
694 for x in 0..5 {
695 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
696 .await?;
697 ipc.store_with_time("modio.test.foo", &x.to_string(), unixtime() - 100)
698 .await?;
699 }
700
701 let vals = ipc.prepare_modio_datapoints(20).await?;
702 assert_eq!(vals.len(), 5);
703 for point in &vals {
704 assert_eq!(&point.key, "modio.test.foo");
705 }
706
707 let more = ipc.prepare_modio_datapoints(10).await?;
708 assert_eq!(more.len(), 5);
710
711 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
714 ipc.remove_prepared(to_remove).await?;
715
716 let modio_after = ipc.prepare_modio_datapoints(30).await?;
717 assert!(modio_after.is_empty());
718
719 let after = ipc.prepare_datapoints(30).await?;
720 assert!(!after.is_empty());
721 Ok(())
722 }
723
724 #[test(timeouttest)]
727 async fn test_get_batch_expiry() -> TestResult {
728 let server = TestServer::new(line!()).await?;
729 let ipc = server.proxy().await?;
730 for x in 0..25 {
731 ipc.store("test.foo", &x.to_string()).await?;
732 }
733 let vals = ipc.prepare_datapoints(10).await?;
734 assert_eq!(
735 vals.len(),
736 0,
737 "Data should not automatically be flushed to disk"
738 );
739
740 ipc.store_with_time("test.foo", "26", unixtime() - 200)
741 .await?;
742 let vals = ipc.prepare_datapoints(10).await?;
743 assert_eq!(
744 vals.len(),
745 10,
746 "Oldest data in buffer should cause a flush to disk"
747 );
748 Ok(())
749 }
750
751 #[test(timeouttest)]
754 async fn test_internal_get_batch_expiry() -> TestResult {
755 let server = TestServer::new(line!()).await?;
756 let ipc = server.proxy().await?;
757 for x in 0..25 {
758 ipc.store("modio.test.foo", &x.to_string()).await?;
759 ipc.store("test.foo", &x.to_string()).await?;
760 }
761 let vals = ipc.prepare_modio_datapoints(10).await?;
762 assert_eq!(
763 vals.len(),
764 0,
765 "Data should not automatically be flushed to disk"
766 );
767
768 ipc.store_with_time("modio.test.foo", "26", unixtime() - 200)
769 .await?;
770 let vals = ipc.prepare_modio_datapoints(10).await?;
771 assert_eq!(
772 vals.len(),
773 10,
774 "Oldest data in buffer should cause a flush to disk"
775 );
776 Ok(())
777 }
778
779 #[test(timeouttest)]
780 async fn test_get_batch() -> TestResult {
781 let server = TestServer::new(line!()).await?;
782 let ipc = server.proxy().await?;
783 for x in 0..25 {
784 ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 200)
785 .await?;
786 }
787
788 let vals = ipc.prepare_datapoints(10).await?;
789 assert_eq!(vals.len(), 10);
790 let point = &vals[0];
791 assert_eq!(point.key, "test.foo");
792 assert_eq!(point.value, "0");
793
794 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
795 ipc.remove_prepared(to_remove).await?;
796
797 let vals = ipc.prepare_datapoints(30).await?;
799 assert_eq!(vals.len(), 15);
800 let point = &vals[0];
801 assert_eq!(point.key, "test.foo");
802 assert_eq!(point.value, "10");
803
804 let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
805 ipc.remove_prepared(to_remove).await?;
806
807 let vals = ipc.prepare_datapoints(30).await?;
808 assert!(vals.is_empty());
809 Ok(())
810 }
811
812 #[test(timeouttest)]
813 async fn test_get_mac() -> TestResult {
817 let server = TestServer::new(line!()).await?;
818 let logger = server.proxy().await?;
819 let res = logger.get_boxid().await?;
820 assert_eq!(res, "000000000000");
821 Ok(())
822 }
823
824 #[test(timeouttest)]
825 async fn test_empty_transactions() -> TestResult {
826 let server = TestServer::new(line!()).await?;
827 let ipc = server.proxy().await?;
828 let transactions = ipc.transaction_get("test").await?;
829 assert_eq!(transactions.len(), 0);
830 let values = ipc.retrieve_all().await?;
831 assert_eq!(values.len(), 0);
832 Ok(())
833 }
834
835 #[test(timeouttest)]
836 async fn test_transaction_get_prefix() -> TestResult {
837 let server = TestServer::new(line!()).await?;
838 let ipc = server.proxy().await?;
839 let transactions = ipc.transaction_get("mbus.").await?;
840 assert_eq!(transactions.len(), 0);
841 Ok(())
842 }
843
844 #[test(timeouttest)]
849 async fn resume_database_test() -> TestResult {
850 let base = Tempbase::default();
851 let dbfile = Tempbase::dbfile();
852 let name = format!("se.modio.logger.TestResumeDb{}", line!());
853
854 let expected = vec![
855 ("test.test.one", "1"),
856 ("test.test.two", "2"),
857 ("test.test.three", "3"),
858 ("test.test.four", "4"),
859 ("test.test.five", "5"),
860 ("test.test.six", "6"),
861 ];
862
863 {
865 let name = name.clone();
866 let base = base.clone();
867 let dbfile = dbfile.clone();
868 let server = TestServer::new_with_base(name, base, dbfile).await?;
869 let logger = server.proxy().await?;
870 info!("Filling datastore");
871 for (key, val) in &expected {
872 logger.store(key, val).await?;
873 }
874 for (key, value) in &expected {
876 let data = logger.retrieve(key).await?;
877 assert_eq!(&data.value, value);
878 assert_eq!(&data.key, key);
879 }
880 info!("Done, settling things");
881 };
882
883 {
885 let server = TestServer::new_with_base(name, base, dbfile).await?;
886 let logger = server.proxy().await?;
887 info!("Checking content after re-start");
888 for (key, value) in &expected {
889 let data = logger.retrieve(key).await?;
890 assert_eq!(&data.value, value);
891 assert_eq!(&data.key, key);
892 }
893 }
894 Ok(())
895 }
896
897 #[allow(clippy::assertions_on_constants)]
902 #[test(timeouttest)]
903 async fn load_and_maintain() -> TestResult {
904 let server = TestServer::new(line!()).await?;
905 let ipc = server.proxy().await?;
906 let logger1 = server.logger1().await?;
907 let submit1 = server.submit1().await?;
908 use crate::testing::proxytest::{Logger1Proxy, Submit1Proxy};
909 use crate::LOGGER_PATH;
910 use async_std::task::{sleep, spawn};
911 use fsipc::legacy::fsipcProxy;
912 use futures::try_join;
913 use std::collections::HashMap;
914 use std::sync::atomic::{AtomicBool, Ordering};
915 use std::sync::Arc;
916 use std::time::Duration;
917
918 let stop = Arc::new(AtomicBool::new(false));
921 const NUM_SLOW_KEYS: usize = 1000;
923 const NUM_TRANSACTIONS: usize = 50;
924 assert!(
925 NUM_TRANSACTIONS < NUM_SLOW_KEYS,
926 "Transactions work on slow keys."
927 );
928 const NUM_BULK_KEYS: usize = 2000;
929 const NUM_SUBMIT_DATA: usize = 200;
931
932 const NUM_CLEAN_LOOPS: usize = 50;
935 const NUM_MAINT_LOOPS: usize = 50;
936
937 async fn trickle_gen(
940 stop: Arc<AtomicBool>,
941 ipc: fsipcProxy<'static>,
942 logger1: Logger1Proxy<'static>,
943 ) -> zbus::Result<()> {
944 loop {
945 for n in 0..NUM_SLOW_KEYS {
946 let key = format!("test.trickle.key.{n}");
947 let name = format!("Name of tricke test key #{n}");
948 let desc = format!("Description {n} of trickle test key #{n}");
949 let val = format!("{n}");
950 logger1.set_metadata_name(&key, &name).await?;
951 logger1.set_metadata_description(&key, &desc).await?;
952 ipc.store(&key, &val).await?;
953 }
954 if stop.load(Ordering::Relaxed) {
955 break Ok(());
956 }
957 }
958 }
959 async fn bulk_gen(
961 stop: Arc<AtomicBool>,
962 logger1: Logger1Proxy<'static>,
963 ) -> zbus::Result<()> {
964 let delay = Duration::from_millis(1);
965 loop {
966 let mut map = HashMap::with_capacity(NUM_BULK_KEYS);
967 for n in 0..=NUM_BULK_KEYS {
968 let key = format!("test.bulk.key.{n}");
969 let val = format!("{n}");
970 map.insert(key, val.into());
971 }
972 logger1.store_batch(map).await?;
973 sleep(delay).await;
974 if stop.load(Ordering::Relaxed) {
975 break Ok(());
976 }
977 }
978 }
979
980 async fn submit_gen(
982 stop: Arc<AtomicBool>,
983 ipc: fsipcProxy<'static>,
984 submit1: Submit1Proxy<'static>,
985 ) -> zbus::Result<()> {
986 let delay = Duration::from_millis(5);
987 loop {
988 if stop.load(Ordering::Relaxed) {
989 break Ok(());
990 }
991 let _meta = submit1.get_all_metadata().await?;
992 let dp = ipc.prepare_datapoints(NUM_SUBMIT_DATA as u32).await?;
993 let to_remove: Vec<_> = dp.iter().map(|m| m.id).collect();
994 if to_remove.is_empty() {
995 sleep(delay).await;
996 } else {
997 ipc.remove_prepared(to_remove).await?;
998 }
999 }
1000 }
1001
1002 async fn tran_gen(stop: Arc<AtomicBool>, ipc: fsipcProxy<'static>) -> zbus::Result<()> {
1004 let delay = Duration::from_millis(5);
1005 let mut t_id = 0;
1006 loop {
1007 sleep(delay).await;
1010 if stop.load(Ordering::Relaxed) {
1011 break Ok(());
1012 }
1013 for n in 0..=NUM_TRANSACTIONS {
1014 t_id += 1;
1015 let transaction_id = format!("{t_id}");
1016 let key = format!("test.trickle.key.{n}");
1017 if let Ok(val) = ipc.retrieve(&key).await {
1019 ipc.transaction_add(&val.key, &val.value, "0", &transaction_id)
1020 .await?;
1021 }
1022 }
1023 }
1024 }
1025
1026 async fn maint_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1027 let delay = Duration::from_millis(5);
1028 use crate::LOGGER_PATH;
1029 let iface_ref = conn
1030 .object_server()
1031 .interface::<_, Logger>(LOGGER_PATH)
1032 .await?;
1033 for _ in 0..=NUM_MAINT_LOOPS {
1034 sleep(delay).await;
1035 if stop.load(Ordering::Relaxed) {
1036 return Ok(());
1037 }
1038 iface_ref
1039 .get_mut()
1040 .await
1041 .periodic()
1042 .await
1043 .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1047 }
1048 stop.store(true, Ordering::Relaxed);
1049 Ok(())
1050 }
1051
1052 async fn clean_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1053 let delay = Duration::from_millis(5);
1054 use crate::LOGGER_PATH;
1055 let iface_ref = conn
1056 .object_server()
1057 .interface::<_, Logger>(LOGGER_PATH)
1058 .await?;
1059 let pool = iface_ref.get_mut().await.ds().pool();
1060 for _ in 0..=NUM_CLEAN_LOOPS {
1061 sleep(delay).await;
1062 if stop.load(Ordering::Relaxed) {
1063 return Ok(());
1064 }
1065 Datastore::clean_maintenance(pool.clone())
1066 .await
1067 .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1068 }
1069 stop.store(true, Ordering::Relaxed);
1070 Ok(())
1071 }
1072
1073 let trickle = spawn(trickle_gen(stop.clone(), ipc.clone(), logger1.clone()));
1074 let bulk = spawn(bulk_gen(stop.clone(), logger1.clone()));
1075 let submit = spawn(submit_gen(stop.clone(), ipc.clone(), submit1.clone()));
1076 let trans = spawn(tran_gen(stop.clone(), ipc.clone()));
1077 let maint = spawn(maint_task(stop.clone(), server.conn.clone()));
1078 let cleant = spawn(clean_task(stop.clone(), server.conn.clone()));
1079 let iface_ref = server
1080 .conn
1081 .object_server()
1082 .interface::<_, Logger>(LOGGER_PATH)
1083 .await?;
1084 let _conn4 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1087 let _conn5 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1088 try_join!(trickle, bulk, submit, trans, maint, cleant)?;
1089 stop.store(true, Ordering::Relaxed);
1090
1091 Ok(())
1092 }
1093}