mocklogger/
logger.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
2// SPDX-License-Identifier: AGPL-3.0-or-later
3#![allow(clippy::module_name_repetitions)]
4mod builder;
5mod errors;
6pub(crate) mod keys;
7mod ping;
8use super::timefail;
9pub use builder::Builder;
10pub use errors::LogErr;
11use fsipc::legacy::{PreparedPoint, Transaction};
12use modio_logger_db::{fxtime, Datastore};
13pub use ping::LoggerPing;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::{interface, object_server::SignalEmitter};
16
17#[derive(Clone)]
18pub struct Logger {
19    ds: Datastore,
20    timefail: timefail::Timefail,
21}
22
23impl Logger {
24    pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
25        if timefail.is_timefail() {
26            info!("Failing all pending change requests due to TIMEFAIL");
27            ds.transaction_fail_pending()
28                .await
29                .inspect_err(|e| error!("Failed to remove pending transactions: {:?}", e))?;
30        }
31        ds.fail_queued_transactions()
32            .await
33            .inspect_err(|e| error!("Failed to remove queued transactions: {:?}", e))?;
34        Ok(Self { ds, timefail })
35    }
36
37    #[instrument(skip(self))]
38    pub async fn periodic(&self) -> Result<(), LogErr> {
39        debug!("Doing periodic tasks, persistance, timefail");
40        if self.ds.should_persist().await? {
41            self.ds
42                .persist_data()
43                .await
44                .inspect_err(|e| error!("Failed to persist datastore: {:?}", e))?;
45        };
46        if self.timefail.is_adjust() {
47            if let Some(adjust) = self.timefail.get_adjust().await {
48                info!("Time jump has happened, adjusting data with {adjust}");
49                let count = self.ds.fix_timefail(adjust).await?;
50                if count > 0 {
51                    info!("Adjusted timestamps of {count} metrics");
52                };
53                self.timefail
54                    .remove_adjust()
55                    .await
56                    .inspect_err(|e| error!("Failed to remove timefail file. err={:?}", e))?;
57            }
58        }
59        let stats = self
60            .ds
61            .get_statistics()
62            .await
63            .inspect_err(|e| error!("Failed to gather DB statistics: {:?}", e))?;
64        info!("Current stats: {stats}");
65        Ok(())
66    }
67    #[cfg(test)]
68    pub const fn ds(&self) -> &Datastore {
69        &self.ds
70    }
71
72    #[must_use]
73    pub const fn builder() -> Builder {
74        Builder::new()
75    }
76}
77
78async fn get_mac() -> String {
79    use async_std::fs;
80    let wan = std::path::Path::new("/sys/class/net/wan/address");
81    let mut res = fs::read_to_string(&wan)
82        .await
83        .unwrap_or_else(|_| String::from("00:00:00:00:00:00"));
84    // Lowercase all ascii-chars ( leaving bytes with high bit untouched)
85    res.make_ascii_lowercase();
86    // Keep only 0-9 a-f
87    res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
88    res
89}
90
91#[allow(clippy::use_self)]
92#[interface(name = "se.modio.logger.fsipc")]
93impl Logger {
94    #[allow(clippy::unused_self)]
95    const fn ping(&self) -> &str {
96        "Ping? Pong"
97    }
98    #[allow(clippy::unused_self)]
99    fn valid_key(&mut self, key: &str) -> bool {
100        keys::valid_key(key).is_ok()
101    }
102
103    #[allow(clippy::unused_self)]
104    async fn get_boxid(&self) -> String {
105        get_mac().await
106    }
107
108    #[instrument(skip(self))]
109    async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
110        keys::valid_key(key)?;
111        let dat = self
112            .ds
113            .get_last_datapoint(key)
114            .await
115            // Having no data for a point is a common case ( point did not exist ) and thus not
116            // worth treating as an error.
117            .inspect_err(|e| debug!("Failed to retrieve last datapoint. err={:?}", e))?;
118        let val = fsipc::legacy::Measure::from(dat);
119        Ok((val,))
120    }
121
122    #[instrument(skip(self))]
123    async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
124        let result = self
125            .ds
126            .get_latest_logdata()
127            .await
128            .inspect_err(|e| error!("Failed to get latest logdata. err={:?}", e))?;
129        let res: Vec<fsipc::legacy::Measure> = result
130            .into_iter()
131            .map(fsipc::legacy::Measure::from)
132            .collect();
133        Ok(res)
134    }
135
136    /// Signal sent when a new value is stored
137    #[zbus(signal)]
138    async fn store_signal(
139        ctxt: &SignalEmitter<'_>,
140        key: &str,
141        value: &str,
142        when: u64,
143    ) -> zbus::Result<()>;
144
145    #[instrument(skip_all)]
146    async fn store(
147        &self,
148        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
149        key: &str,
150        value: &str,
151    ) -> Result<(), LogErr> {
152        keys::valid_key(key)?;
153        let when = fxtime();
154        let timefail = self.timefail.is_timefail();
155        self.ds
156            .insert(key, value, when, timefail)
157            .await
158            .inspect_err(|e| error!("Failed to store single metric. err={:?}", e))?;
159
160        #[allow(
161            clippy::cast_precision_loss,
162            clippy::cast_sign_loss,
163            clippy::cast_possible_truncation
164        )]
165        let trunc_when = when as u64;
166
167        if !key.starts_with("modio.") {
168            Self::store_signal(&ctxt, key, value, trunc_when).await?;
169        };
170        Ok(())
171    }
172
173    #[instrument(skip_all)]
174    async fn store_with_time(
175        &mut self,
176        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
177        key: &str,
178        value: &str,
179        when: u64,
180    ) -> Result<(), LogErr> {
181        keys::valid_key(key)?;
182        // timefail is always false when storing with timestamp, as the timestamp is expected to be
183        // correct from another source.
184        let timefail = false;
185
186        // if time wraps we have another problem.
187        #[allow(clippy::cast_precision_loss)]
188        let db_when = when as f64;
189
190        self.ds
191            .insert(key, value, db_when, timefail)
192            .await
193            .inspect_err(|e| error!("Failed to store single metric with timestamp. err={:?}", e))?;
194        if !key.starts_with("modio.") {
195            Self::store_signal(&ctxt, key, value, when).await?;
196        };
197        Ok(())
198    }
199    /// Signal sent when a new transaction is added
200    #[zbus(signal)]
201    async fn transaction_added(ctxt: &SignalEmitter<'_>, key: &str) -> zbus::Result<()>;
202
203    #[instrument(skip_all)]
204    async fn transaction_add(
205        &mut self,
206        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
207        key: &str,
208        expected: &str,
209        target: &str,
210        token: &str,
211    ) -> Result<(), LogErr> {
212        keys::valid_key(key)?;
213        keys::valid_token(token)?;
214        if self
215            .ds
216            .has_transaction(token)
217            .await
218            .inspect_err(|e| error!("Failed to check for transaction for key. err={:?}", e))?
219        {
220            warn!(
221                "Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
222                key, token
223            );
224            return Ok(());
225        }
226        self.ds
227            .transaction_add(key, expected, target, token)
228            .await
229            .inspect_err(|e| error!("Failed to add transaction for key. err={:?}", e))?;
230        Self::transaction_added(&ctxt, key).await?;
231        Ok(())
232    }
233
234    #[instrument(skip(self))]
235    async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
236        debug!("Retrieving transactions beginning with {}", prefix);
237        let res = self
238            .ds
239            .transaction_get(prefix)
240            .await
241            .inspect_err(|e| error!("Failed to get transaction for key. err={:?}", e))?;
242
243        let res: Vec<Transaction> = res.into_iter().map(Transaction::from).collect();
244        Ok(res)
245    }
246
247    #[instrument(skip(self))]
248    async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
249        debug!("Marking transaction: t_id={}, failed", t_id);
250        let timefail = self.timefail.is_timefail();
251
252        // if it wraps, that is fine.
253        #[allow(clippy::cast_possible_wrap)]
254        let t_id = t_id as i64;
255        let count = self
256            .ds
257            .transaction_fail(t_id, timefail)
258            .await
259            .inspect_err(|e| error!("Failed to mark transaction as failed. err={:?}", e))?;
260        if count > 0 {
261            Ok(())
262        } else {
263            Err(LogErr::TransactionNotFound)
264        }
265    }
266
267    #[instrument(skip(self))]
268    async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
269        debug!("Marking transaction: t_id={}, passed ", t_id);
270        let timefail = self.timefail.is_timefail();
271        // if it wraps, that is fine.
272        #[allow(clippy::cast_possible_wrap)]
273        let t_id = t_id as i64;
274        let count = self
275            .ds
276            .transaction_pass(t_id, timefail)
277            .await
278            .inspect_err(|e| error!("Failed to mark transaction as passed. err={:?}", e))?;
279        if count > 0 {
280            Ok(())
281        } else {
282            Err(LogErr::TransactionNotFound)
283        }
284    }
285
286    #[instrument(skip(self))]
287    async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
288        prepare_range_check(maximum)?;
289        let data = self
290            .ds
291            .get_batch(maximum)
292            .await
293            .inspect_err(|e| error!("Failed to get batch of datapoints. err={:?}", e))?;
294        let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
295        Ok(result)
296    }
297
298    #[instrument(skip(self))]
299    async fn prepare_modio_datapoints(
300        &mut self,
301        maximum: u32,
302    ) -> Result<Vec<PreparedPoint>, LogErr> {
303        prepare_range_check(maximum)?;
304        let data = self
305            .ds
306            .get_internal_batch(maximum)
307            .await
308            .inspect_err(|e| error!("Failed to get_internal_batch. err={:?}", e))?;
309        let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
310        Ok(result)
311    }
312
313    #[instrument(skip_all)]
314    async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
315        prepare_remove_check(&items)?;
316        self.ds
317            .drop_batch(&items)
318            .await
319            .inspect_err(|e| error!("Failed to delete batch of submitted items. err={:?}", e))?;
320        Ok(())
321    }
322}
323
324/// Check of incoming values to be removed, Returns error on invalid items
325fn prepare_remove_check(items: &[i64]) -> Result<(), PreparedError> {
326    if items.is_empty() {
327        return Err(PreparedError::Empty);
328    }
329    if items.iter().any(|x| *x < 0_i64) {
330        return Err(PreparedError::InvalidIndex);
331    }
332    Ok(())
333}
334
335#[derive(thiserror::Error, Debug)]
336pub enum PreparedError {
337    #[error("Too few items")]
338    TooSmall,
339    #[error("Too many items")]
340    TooMany,
341    #[error("Empty item set")]
342    Empty,
343    #[error("Invalid index")]
344    InvalidIndex,
345}
346
347// Converting From a Prepared Error to LogError (which is used by DBus layer)
348impl From<PreparedError> for LogErr {
349    fn from(e: PreparedError) -> Self {
350        Self::InvalidPrepared(e.to_string())
351    }
352}
353
354/// Range check helper for `prepare_datapoints` and `prepare_modio_datapoints`
355///
356/// Errors:
357///    `PreparedError` with a specific error item.
358const fn prepare_range_check(num: u32) -> Result<(), PreparedError> {
359    match num {
360        0 => Err(PreparedError::TooSmall),
361        1..=250 => Ok(()),
362        _ => Err(PreparedError::TooMany),
363    }
364}
365
366#[cfg(test)]
367pub mod tests {
368    use super::*;
369
370    use crate::testing::{Tempbase, TestServer};
371    use fsipc::unixtime;
372    use std::error::Error;
373    use test_log::test;
374    use timeout_macro::timeouttest;
375
376    type TestResult = Result<(), Box<dyn Error>>;
377
378    use futures_util::{FutureExt, StreamExt};
379
380    #[test(timeouttest)]
381    async fn ping_pong_test() -> TestResult {
382        let server = TestServer::new(line!()).await?;
383        let proxy = server.proxy().await?;
384        let first = proxy.ping().await?;
385        let second = proxy.ping().await?;
386        assert_eq!(first, "Ping? Pong");
387        assert_eq!(second, "Ping? Pong");
388        Ok(())
389    }
390
391    #[test(timeouttest)]
392    async fn done_gives_error_test() -> TestResult {
393        let server = TestServer::new(line!()).await?;
394        let proxy = server.proxy().await?;
395        let res = proxy.done().await;
396        assert!(res.is_err());
397        Ok(())
398    }
399
400    #[test(timeouttest)]
401    async fn store_retrieve() -> TestResult {
402        let server = TestServer::new(line!()).await?;
403        let proxy = server.proxy().await?;
404        proxy.store("test.key", "abc123").await?;
405        let m = proxy.retrieve("test.key").await?;
406        assert_eq!(m.key, "test.key");
407        assert_eq!(m.value, "abc123");
408        Ok(())
409    }
410
411    #[test(timeouttest)]
412    async fn store_buffer() -> TestResult {
413        let server = TestServer::new(line!()).await?;
414        let proxy = server.proxy().await?;
415        proxy
416            .store_with_time("test.key", "abc123", 1_494_602_107)
417            .await?;
418        proxy
419            .store_with_time("test.key", "abc1234", 1_494_602_108)
420            .await?;
421        let m = proxy.retrieve("test.key").await?;
422        assert_eq!(m.key, "test.key");
423        assert_eq!(m.value, "abc1234");
424        assert_eq!(m.timestamp, 1_494_602_108);
425        Ok(())
426    }
427
428    #[test(timeouttest)]
429    async fn retrieve_all_test() -> TestResult {
430        let server = TestServer::new(line!()).await?;
431        let proxy = server.proxy().await?;
432        debug!("I have a party here");
433        proxy
434            .store_with_time("test.key", "abc123", 1_494_602_107)
435            .await?;
436        proxy
437            .store_with_time("test.key", "abc1234", 1_494_602_108)
438            .await?;
439        proxy.store("test.key2", "abcdefg").await?;
440
441        let all = proxy.retrieve_all().await?;
442        assert_eq!(all.len(), 2);
443        let m0 = all.first().expect("Should have value");
444        assert_eq!(m0.key, "test.key");
445        assert_eq!(m0.value, "abc1234");
446        assert_eq!(m0.timestamp, 1_494_602_108);
447        let m1 = all.get(1).expect("Should have value");
448        assert_eq!(m1.key, "test.key2");
449        assert_eq!(m1.value, "abcdefg");
450        Ok(())
451    }
452
453    #[test(timeouttest)]
454    async fn transaction_adding_test() -> TestResult {
455        let server = TestServer::new(line!()).await?;
456        let proxy = server.proxy().await?;
457        proxy
458            .transaction_add("test.test.one", "first", "second", "012")
459            .await?;
460        proxy
461            .transaction_add("dummy.test.one", "should not", "be present", "013")
462            .await?;
463        let transactions = proxy.transaction_get("test.test").await?;
464        assert_eq!(transactions.len(), 1);
465        let res = &transactions[0];
466        assert_eq!(res.key, "test.test.one");
467        assert_eq!(res.t_id, 1, "Transaction ID mismatch");
468        Ok(())
469    }
470
471    #[test(timeouttest)]
472    async fn transaction_dupe_adding_test() -> TestResult {
473        let server = TestServer::new(line!()).await?;
474        let proxy = server.proxy().await?;
475        proxy
476            .transaction_add("test.test.one", "first", "second", "1638290048")
477            .await?;
478        let res = proxy
479            .transaction_add("test.test.one", "first", "second", "1638290048")
480            .await;
481        res.expect("duplicated tokens should not cause error");
482        let transactions = proxy.transaction_get("test.test").await?;
483        assert_eq!(transactions.len(), 1);
484        Ok(())
485    }
486
487    #[test(timeouttest)]
488    async fn transaction_signal_test() -> TestResult {
489        let server = TestServer::new(line!()).await?;
490        let logger = server.proxy().await?;
491        let mut stream = logger.receive_transaction_added().await?;
492        logger
493            .transaction_add("test.test.transaction_signal", "first", "second", "012")
494            .await?;
495
496        let signal = stream.next().await.unwrap();
497        let payload = signal.args()?;
498        assert_eq!(payload.key, "test.test.transaction_signal");
499        Ok(())
500    }
501
502    #[test(timeouttest)]
503    async fn transaction_passing_test() -> TestResult {
504        let server = TestServer::new(line!()).await?;
505        let logger = server.proxy().await?;
506        logger
507            .transaction_add("test.test.one", "first", "second", "012")
508            .await?;
509        logger
510            .transaction_add("test.test.two", "uno", "dos", "0113")
511            .await?;
512        let trans = logger.transaction_get("test.test").await?;
513        logger.transaction_fail(trans[0].t_id).await?;
514        logger.transaction_pass(trans[1].t_id).await?;
515        logger
516            .transaction_add("test.test.three", "etta", "tvåa", "0114")
517            .await?;
518        let transactions = logger.transaction_get("test.test").await?;
519        assert_eq!(transactions.len(), 1);
520        let res = &transactions[0];
521        assert_eq!(res.key, "test.test.three");
522        assert_eq!(res.t_id, 3, "Transaction id mismatch");
523        Ok(())
524    }
525
526    #[test(timeouttest)]
527    async fn retrieving_data_test() -> TestResult {
528        let server = TestServer::new(line!()).await?;
529        let ipc = server.proxy().await?;
530        ipc.store("test.test.one", "first").await?;
531        ipc.store("test.test.one", "second").await?;
532        ipc.store("test.test.one", "third").await?;
533
534        ipc.store("test.test.two", "1").await?;
535        ipc.store("test.test.two", "2").await?;
536        ipc.store("test.test.two", "3").await?;
537
538        let res = ipc.retrieve_all().await?;
539        for measure in &res {
540            let data = ipc.retrieve(&measure.key).await?;
541            assert_eq!(data.key, measure.key);
542        }
543        Ok(())
544    }
545
546    #[test(timeouttest)]
547    async fn valid_key_test() -> TestResult {
548        let server = TestServer::new(line!()).await?;
549        let ipc = server.proxy().await?;
550        assert!(ipc.valid_key("modio.software.development").await?);
551        assert!(ipc.valid_key("abc").await?);
552        assert!(ipc.valid_key("a.b.c").await?);
553        assert!(ipc.valid_key("a_b.c").await?);
554        Ok(())
555    }
556
557    #[test(timeouttest)]
558    async fn invalid_key_test() -> TestResult {
559        let server = TestServer::new(line!()).await?;
560        let ipc = server.proxy().await?;
561        assert!(!ipc.valid_key("modio..invalid").await?);
562        assert!(!ipc.valid_key(".modio..invalid").await?);
563        assert!(!ipc.valid_key("modio.invalid.").await?);
564        assert!(!ipc.valid_key("modio. invalid").await?);
565        assert!(!ipc.valid_key("modio.in valid").await?);
566        assert!(!ipc.valid_key("modio.invalid ").await?);
567        assert!(!ipc.valid_key(" modio.invalid").await?);
568        Ok(())
569    }
570
571    #[test(timeouttest)]
572    async fn transaction_double() -> TestResult {
573        let server = TestServer::new(line!()).await?;
574        let ipc = server.proxy().await?;
575        let key = "test.test.one";
576        let first = "first";
577        let second = "second";
578        let guid4 = "02558af4-ed83-4441-bc56-4bdf3c57092f";
579        // Store the key
580        ipc.store(key, first).await?;
581
582        ipc.transaction_add(key, first, second, guid4).await?;
583        let transactions = ipc.transaction_get(key).await?;
584        let first_transaction = transactions
585            .first()
586            .expect("Should have at least one transaction");
587        let res = ipc.transaction_pass(first_transaction.t_id).await;
588        assert!(res.is_ok());
589        let res = ipc.transaction_pass(first_transaction.t_id).await;
590        assert!(res.is_err());
591        Ok(())
592    }
593
594    #[test(timeouttest)]
595    async fn transaction_tests() -> TestResult {
596        let server = TestServer::new(line!()).await?;
597        let ipc = server.proxy().await?;
598        let key = "test.test.one";
599        let first = "first";
600        let mut our_value = "first";
601        let second = "second";
602
603        ipc.store(key, our_value).await?;
604        let when = unixtime();
605
606        ipc.store_with_time(key, our_value, when).await?;
607
608        let guid1 = "04b56dcd-527a-4dd7-909c-a4111c035cbb";
609        ipc.transaction_add(key, first, second, guid1).await?;
610        let guid2 = "0000111001110222";
611        ipc.transaction_add(key, first, second, guid2).await?;
612        let guid3 = "ff505709-0119-46f4-87c8-f67f0a0d953b";
613        ipc.transaction_add(key, first, second, guid3).await?;
614
615        let transactions = ipc.transaction_get(key).await?;
616        // The "id" here is an internal ID in the current logger instance, not the same as
617        // our GUID ID we entered above.
618        for trn in &transactions {
619            if trn.key == key {
620                if our_value == trn.expected {
621                    our_value = &trn.target;
622                    ipc.transaction_pass(trn.t_id).await?;
623                } else {
624                    ipc.transaction_fail(trn.t_id).await?;
625                }
626            }
627            ipc.store(key, our_value).await?;
628        }
629        // Transaction  get should _succeed_ but return empty data.
630        let res = ipc.transaction_get(key).await?;
631        assert_eq!(res.len(), 0);
632
633        Ok(())
634    }
635
636    #[test(timeouttest)]
637    async fn no_modio_signals() -> TestResult {
638        let server = TestServer::new(line!()).await?;
639        let ipc = server.proxy().await?;
640        let mut stream = ipc.receive_store_signal().await?;
641        // Send a transactiona
642        ipc.store("test.test.test", "value").await?;
643        let first = stream.next().await.unwrap();
644        let payload = first.args()?;
645        assert_eq!(payload.key, "test.test.test");
646
647        // A store to a "modio." prefix key should not end up in a signal
648        // Therefore we should have an empty result here.
649        ipc.store("modio.test.test", "value").await?;
650        let second = stream.next().now_or_never();
651        assert!(second.is_none());
652        Ok(())
653    }
654
655    #[test(timeouttest)]
656    async fn submit_consume() -> TestResult {
657        let server = TestServer::new(line!()).await?;
658        let ipc = server.proxy().await?;
659        for x in 0..5 {
660            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
661                .await?;
662        }
663        let vals = ipc.prepare_datapoints(10).await?;
664        assert_eq!(vals.len(), 5);
665
666        let point = &vals[0];
667        assert_eq!(point.key, "test.foo");
668        assert_eq!(point.value, "0");
669
670        let more = ipc.prepare_datapoints(10).await?;
671        // Should be available still
672        assert_eq!(more.len(), 5);
673
674        let last = &vals[4];
675        assert_eq!(last.key, "test.foo");
676        assert_eq!(last.value, "4");
677
678        let to_remove: Vec<_> = vals.iter().map(|m| m.id).collect();
679
680        ipc.remove_prepared(to_remove).await?;
681        let after = ipc.prepare_datapoints(30).await?;
682        assert!(after.is_empty());
683        Ok(())
684    }
685
686    #[test(timeouttest)]
687    async fn submit_modio_consume() -> TestResult {
688        let server = TestServer::new(line!()).await?;
689        let ipc = server.proxy().await?;
690        for x in 0..5 {
691            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
692                .await?;
693            ipc.store_with_time("modio.test.foo", &x.to_string(), unixtime() - 100)
694                .await?;
695        }
696
697        let vals = ipc.prepare_modio_datapoints(20).await?;
698        assert_eq!(vals.len(), 5);
699        for point in &vals {
700            assert_eq!(&point.key, "modio.test.foo");
701        }
702
703        let more = ipc.prepare_modio_datapoints(10).await?;
704        // Should be available still
705        assert_eq!(more.len(), 5);
706
707        // Figure out how to remove the ID's here
708        //
709        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
710        ipc.remove_prepared(to_remove).await?;
711
712        let modio_after = ipc.prepare_modio_datapoints(30).await?;
713        assert!(modio_after.is_empty());
714
715        let after = ipc.prepare_datapoints(30).await?;
716        assert!(!after.is_empty());
717        Ok(())
718    }
719
720    // Prepare datapoints should cause a write to disk only if the buffer is eitheer full or too
721    // old, as for how the database considers things.
722    #[test(timeouttest)]
723    async fn test_get_batch_expiry() -> TestResult {
724        let server = TestServer::new(line!()).await?;
725        let ipc = server.proxy().await?;
726        for x in 0..25 {
727            ipc.store("test.foo", &x.to_string()).await?;
728        }
729        let vals = ipc.prepare_datapoints(10).await?;
730        assert_eq!(
731            vals.len(),
732            0,
733            "Data should not automatically be flushed to disk"
734        );
735
736        ipc.store_with_time("test.foo", "26", unixtime() - 200)
737            .await?;
738        let vals = ipc.prepare_datapoints(10).await?;
739        assert_eq!(
740            vals.len(),
741            10,
742            "Oldest data in buffer should cause a flush to disk"
743        );
744        Ok(())
745    }
746
747    // Prepare (internal) datapoints should cause a write to disk only if the buffer is either full
748    // or too old, as for how the database considers things.
749    #[test(timeouttest)]
750    async fn test_internal_get_batch_expiry() -> TestResult {
751        let server = TestServer::new(line!()).await?;
752        let ipc = server.proxy().await?;
753        for x in 0..25 {
754            ipc.store("modio.test.foo", &x.to_string()).await?;
755            ipc.store("test.foo", &x.to_string()).await?;
756        }
757        let vals = ipc.prepare_modio_datapoints(10).await?;
758        assert_eq!(
759            vals.len(),
760            0,
761            "Data should not automatically be flushed to disk"
762        );
763
764        ipc.store_with_time("modio.test.foo", "26", unixtime() - 200)
765            .await?;
766        let vals = ipc.prepare_modio_datapoints(10).await?;
767        assert_eq!(
768            vals.len(),
769            10,
770            "Oldest data in buffer should cause a flush to disk"
771        );
772        Ok(())
773    }
774
775    #[test(timeouttest)]
776    async fn test_get_batch() -> TestResult {
777        let server = TestServer::new(line!()).await?;
778        let ipc = server.proxy().await?;
779        for x in 0..25 {
780            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 200)
781                .await?;
782        }
783
784        let vals = ipc.prepare_datapoints(10).await?;
785        assert_eq!(vals.len(), 10);
786        let point = &vals[0];
787        assert_eq!(point.key, "test.foo");
788        assert_eq!(point.value, "0");
789
790        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
791        ipc.remove_prepared(to_remove).await?;
792
793        // Requst more than we have left, should return only the 25-10=15 items
794        let vals = ipc.prepare_datapoints(30).await?;
795        assert_eq!(vals.len(), 15);
796        let point = &vals[0];
797        assert_eq!(point.key, "test.foo");
798        assert_eq!(point.value, "10");
799
800        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
801        ipc.remove_prepared(to_remove).await?;
802
803        let vals = ipc.prepare_datapoints(30).await?;
804        assert!(vals.is_empty());
805        Ok(())
806    }
807
808    #[test(timeouttest)]
809    /// Technically, this test might fail if you have an ethernet interface named "wan"
810    /// I'm ok with that for now as that is unlikely to happen, if it does, disable the test, or
811    /// check it as something better
812    async fn test_get_mac() -> TestResult {
813        let server = TestServer::new(line!()).await?;
814        let logger = server.proxy().await?;
815        let res = logger.get_boxid().await?;
816        assert_eq!(res, "000000000000");
817        Ok(())
818    }
819
820    #[test(timeouttest)]
821    async fn test_empty_transactions() -> TestResult {
822        let server = TestServer::new(line!()).await?;
823        let ipc = server.proxy().await?;
824        let transactions = ipc.transaction_get("test").await?;
825        assert_eq!(transactions.len(), 0);
826        let values = ipc.retrieve_all().await?;
827        assert_eq!(values.len(), 0);
828        Ok(())
829    }
830
831    #[test(timeouttest)]
832    async fn test_transaction_get_prefix() -> TestResult {
833        let server = TestServer::new(line!()).await?;
834        let ipc = server.proxy().await?;
835        let transactions = ipc.transaction_get("mbus.").await?;
836        assert_eq!(transactions.len(), 0);
837        Ok(())
838    }
839
840    // This test is speshul.
841    // It dupes the logic for the temp server in order to start it several times and compare
842    // results between executions, in order to make sure that temp data (ram only) is accessible
843    // later.
844    #[test(timeouttest)]
845    async fn resume_database_test() -> TestResult {
846        let base = Tempbase::default();
847        let dbfile = Tempbase::dbfile();
848        let name = format!("se.modio.logger.TestResumeDb{}", line!());
849
850        let expected = vec![
851            ("test.test.one", "1"),
852            ("test.test.two", "2"),
853            ("test.test.three", "3"),
854            ("test.test.four", "4"),
855            ("test.test.five", "5"),
856            ("test.test.six", "6"),
857        ];
858
859        // Spawn server, store some data and then shut it down
860        {
861            let name = name.clone();
862            let base = base.clone();
863            let dbfile = dbfile.clone();
864            let server = TestServer::new_with_base(name, base, dbfile).await?;
865            let logger = server.proxy().await?;
866            info!("Filling datastore");
867            for (key, val) in &expected {
868                logger.store(key, val).await?;
869            }
870            // Read the data to ensure we have it in storage
871            for (key, value) in &expected {
872                let data = logger.retrieve(key).await?;
873                assert_eq!(&data.value, value);
874                assert_eq!(&data.key, key);
875            }
876            info!("Done, settling things");
877        };
878
879        // Spawn another server, retrieving the previous data and ensure it is the same.
880        {
881            let server = TestServer::new_with_base(name, base, dbfile).await?;
882            let logger = server.proxy().await?;
883            info!("Checking content after re-start");
884            for (key, value) in &expected {
885                let data = logger.retrieve(key).await?;
886                assert_eq!(&data.value, value);
887                assert_eq!(&data.key, key);
888            }
889        }
890        Ok(())
891    }
892
893    // This is a big test-case that covers multiple concurrent reader-writers while running
894    // maintenance jobs at the same time.
895    // The test triggers load problems with the system if there are too few DB connections, and
896    // more.
897    #[allow(clippy::assertions_on_constants)]
898    #[test(timeouttest)]
899    async fn load_and_maintain() -> TestResult {
900        let server = TestServer::new(line!()).await?;
901        let ipc = server.proxy().await?;
902        let logger1 = server.logger1().await?;
903        let submit1 = server.submit1().await?;
904        use crate::testing::proxytest::{Logger1Proxy, Submit1Proxy};
905        use crate::LOGGER_PATH;
906        use async_std::task::{sleep, spawn};
907        use fsipc::legacy::fsipcProxy;
908        use futures::try_join;
909        use std::collections::HashMap;
910        use std::sync::atomic::{AtomicBool, Ordering};
911        use std::sync::Arc;
912        use std::time::Duration;
913
914        // Atomic flag that the tasks check before writing, prevents them from causing trouble once
915        // we reach shutdown
916        let stop = Arc::new(AtomicBool::new(false));
917        // Amount of keys to have traffic for that are using "one write per key" mode.
918        const NUM_SLOW_KEYS: usize = 1000;
919        const NUM_TRANSACTIONS: usize = 50;
920        assert!(
921            NUM_TRANSACTIONS < NUM_SLOW_KEYS,
922            "Transactions work on slow keys."
923        );
924        const NUM_BULK_KEYS: usize = 2000;
925        // How many items the submitter will consume each loop
926        const NUM_SUBMIT_DATA: usize = 200;
927
928        // How many loops to run for each of maintenance / clean until we are done
929        // The test will end after either of these reach their limit, or something fails.
930        const NUM_CLEAN_LOOPS: usize = 50;
931        const NUM_MAINT_LOOPS: usize = 50;
932
933        // 1 task to generate "trickle" of data, using ipc.store with delays.
934        //  ( also sets metadata for each iteration )
935        async fn trickle_gen(
936            stop: Arc<AtomicBool>,
937            ipc: fsipcProxy<'static>,
938            logger1: Logger1Proxy<'static>,
939        ) -> zbus::Result<()> {
940            loop {
941                for n in 0..NUM_SLOW_KEYS {
942                    let key = format!("test.trickle.key.{n}");
943                    let name = format!("Name of tricke test key #{n}");
944                    let desc = format!("Description {n} of trickle test key #{n}");
945                    let val = format!("{n}");
946                    logger1.set_metadata_name(&key, &name).await?;
947                    logger1.set_metadata_description(&key, &desc).await?;
948                    ipc.store(&key, &val).await?;
949                }
950                if stop.load(Ordering::Relaxed) {
951                    break Ok(());
952                }
953            }
954        }
955        // 1 task to generate "bulk" of data, using logger1.store func
956        async fn bulk_gen(
957            stop: Arc<AtomicBool>,
958            logger1: Logger1Proxy<'static>,
959        ) -> zbus::Result<()> {
960            let delay = Duration::from_millis(1);
961            loop {
962                let mut map = HashMap::with_capacity(NUM_BULK_KEYS);
963                for n in 0..=NUM_BULK_KEYS {
964                    let key = format!("test.bulk.key.{n}");
965                    let val = format!("{n}");
966                    map.insert(key, val.into());
967                }
968                logger1.store_batch(map).await?;
969                sleep(delay).await;
970                if stop.load(Ordering::Relaxed) {
971                    break Ok(());
972                }
973            }
974        }
975
976        // 1 task to pretend to submit, calling "get all metadata" and then "prepare batch"
977        async fn submit_gen(
978            stop: Arc<AtomicBool>,
979            ipc: fsipcProxy<'static>,
980            submit1: Submit1Proxy<'static>,
981        ) -> zbus::Result<()> {
982            let delay = Duration::from_millis(5);
983            loop {
984                if stop.load(Ordering::Relaxed) {
985                    break Ok(());
986                }
987                let _meta = submit1.get_all_metadata().await?;
988                let dp = ipc.prepare_datapoints(NUM_SUBMIT_DATA as u32).await?;
989                let to_remove: Vec<_> = dp.iter().map(|m| m.id).collect();
990                if to_remove.is_empty() {
991                    sleep(delay).await;
992                } else {
993                    ipc.remove_prepared(to_remove).await?;
994                }
995            }
996        }
997
998        // 1 task to poll keys and add transactions
999        async fn tran_gen(stop: Arc<AtomicBool>, ipc: fsipcProxy<'static>) -> zbus::Result<()> {
1000            let delay = Duration::from_millis(5);
1001            let mut t_id = 0;
1002            loop {
1003                // Sleep first to give the trickle code a chance to add measures. Doing it that way
1004                // cuts down on errors.
1005                sleep(delay).await;
1006                if stop.load(Ordering::Relaxed) {
1007                    break Ok(());
1008                }
1009                for n in 0..=NUM_TRANSACTIONS {
1010                    t_id += 1;
1011                    let transaction_id = format!("{t_id}");
1012                    let key = format!("test.trickle.key.{n}");
1013                    // No datapoint exists yet, that is not a problem for this code
1014                    if let Ok(val) = ipc.retrieve(&key).await {
1015                        ipc.transaction_add(&val.key, &val.value, "0", &transaction_id)
1016                            .await?;
1017                    }
1018                }
1019            }
1020        }
1021
1022        async fn maint_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1023            let delay = Duration::from_millis(5);
1024            use crate::LOGGER_PATH;
1025            let iface_ref = conn
1026                .object_server()
1027                .interface::<_, Logger>(LOGGER_PATH)
1028                .await?;
1029            for _ in 0..=NUM_MAINT_LOOPS {
1030                sleep(delay).await;
1031                if stop.load(Ordering::Relaxed) {
1032                    return Ok(());
1033                }
1034                iface_ref
1035                    .get_mut()
1036                    .await
1037                    .periodic()
1038                    .await
1039                    // Since the proxies return a zbus::Error<()> and we want to use try_join below, we
1040                    // need to return the same kind of error, and I don't feel like implementing the
1041                    // conversions just for some test-cases.
1042                    .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1043            }
1044            stop.store(true, Ordering::Relaxed);
1045            Ok(())
1046        }
1047
1048        async fn clean_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1049            let delay = Duration::from_millis(5);
1050            use crate::LOGGER_PATH;
1051            let iface_ref = conn
1052                .object_server()
1053                .interface::<_, Logger>(LOGGER_PATH)
1054                .await?;
1055            let pool = iface_ref.get_mut().await.ds().pool();
1056            for _ in 0..=NUM_CLEAN_LOOPS {
1057                sleep(delay).await;
1058                if stop.load(Ordering::Relaxed) {
1059                    return Ok(());
1060                }
1061                Datastore::clean_maintenance(pool.clone())
1062                    .await
1063                    .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1064            }
1065            stop.store(true, Ordering::Relaxed);
1066            Ok(())
1067        }
1068
1069        let trickle = spawn(trickle_gen(stop.clone(), ipc.clone(), logger1.clone()));
1070        let bulk = spawn(bulk_gen(stop.clone(), logger1.clone()));
1071        let submit = spawn(submit_gen(stop.clone(), ipc.clone(), submit1.clone()));
1072        let trans = spawn(tran_gen(stop.clone(), ipc.clone()));
1073        let maint = spawn(maint_task(stop.clone(), server.conn.clone()));
1074        let cleant = spawn(clean_task(stop.clone(), server.conn.clone()));
1075        let iface_ref = server
1076            .conn
1077            .object_server()
1078            .interface::<_, Logger>(LOGGER_PATH)
1079            .await?;
1080        /* Artificially  drain a few connetions from our pool. default pool is 4, so we steal 2
1081         * more. 1 for maintenance and one for the pool. */
1082        let _conn4 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1083        let _conn5 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1084        try_join!(trickle, bulk, submit, trans, maint, cleant)?;
1085        stop.store(true, Ordering::Relaxed);
1086
1087        Ok(())
1088    }
1089}