mocklogger/
logger.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
2// SPDX-License-Identifier: AGPL-3.0-or-later
3#![allow(clippy::module_name_repetitions)]
4mod builder;
5mod errors;
6pub(crate) mod keys;
7mod ping;
8use super::timefail;
9pub use builder::Builder;
10pub use errors::LogErr;
11use fsipc::legacy::{PreparedPoint, Transaction};
12use modio_logger_db::{fxtime, Datastore};
13pub use ping::LoggerPing;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::{interface, SignalContext};
16
17#[derive(Clone)]
18pub struct Logger {
19    ds: Datastore,
20    timefail: timefail::Timefail,
21}
22
23impl Logger {
24    pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
25        if timefail.is_timefail() {
26            info!("Failing all pending change requests due to TIMEFAIL");
27            ds.transaction_fail_pending()
28                .await
29                .inspect_err(|e| error!("Failed to remove pending transactions: {:?}", e))?;
30        }
31        ds.fail_queued_transactions()
32            .await
33            .inspect_err(|e| error!("Failed to remove queued transactions: {:?}", e))?;
34        Ok(Self { ds, timefail })
35    }
36
37    #[instrument(skip(self))]
38    pub async fn periodic(&self) -> Result<(), LogErr> {
39        debug!("Doing periodic tasks, persistance, timefail");
40        if self.ds.should_persist().await? {
41            self.ds
42                .persist_data()
43                .await
44                .inspect_err(|e| error!("Failed to persist datastore: {:?}", e))?;
45        };
46        if self.timefail.is_adjust() {
47            if let Some(adjust) = self.timefail.get_adjust().await {
48                info!("Time jump has happened, adjusting data with {adjust}");
49                let count = self.ds.fix_timefail(adjust).await?;
50                if count > 0 {
51                    info!("Adjusted timestamps of {count} metrics");
52                };
53                self.timefail
54                    .remove_adjust()
55                    .await
56                    .inspect_err(|e| error!("Failed to remove timefail file. err={:?}", e))?;
57            }
58        }
59        let stats = self
60            .ds
61            .get_statistics()
62            .await
63            .inspect_err(|e| error!("Failed to gather DB statistics: {:?}", e))?;
64        info!("Current stats: {stats}");
65        Ok(())
66    }
67    #[cfg(test)]
68    pub const fn ds(&self) -> &Datastore {
69        &self.ds
70    }
71
72    #[must_use]
73    pub const fn builder() -> Builder {
74        Builder::new()
75    }
76}
77
78async fn get_mac() -> String {
79    use async_std::fs;
80    let wan = std::path::Path::new("/sys/class/net/wan/address");
81    let mut res = fs::read_to_string(&wan)
82        .await
83        .unwrap_or_else(|_| String::from("00:00:00:00:00:00"));
84    // Lowercase all ascii-chars ( leaving bytes with high bit untouched)
85    res.make_ascii_lowercase();
86    // Keep only 0-9 a-f
87    res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
88    res
89}
90
91#[allow(clippy::use_self)]
92#[interface(name = "se.modio.logger.fsipc")]
93impl Logger {
94    #[allow(clippy::unused_self)]
95    const fn ping(&self) -> &str {
96        "Ping? Pong"
97    }
98    #[allow(clippy::unused_self)]
99    fn valid_key(&mut self, key: &str) -> bool {
100        keys::valid_key(key).is_ok()
101    }
102
103    #[allow(clippy::unused_self)]
104    async fn get_boxid(&self) -> String {
105        get_mac().await
106    }
107
108    #[instrument(skip(self))]
109    async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
110        keys::valid_key(key)?;
111        let dat = self
112            .ds
113            .get_last_datapoint(key)
114            .await
115            // Having no data for a point is a common case ( point did not exist ) and thus not
116            // worth treating as an error.
117            .inspect_err(|e| debug!("Failed to retrieve last datapoint. err={:?}", e))?;
118        let val = fsipc::legacy::Measure::from(dat);
119        Ok((val,))
120    }
121
122    #[instrument(skip(self))]
123    async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
124        let result = self
125            .ds
126            .get_latest_logdata()
127            .await
128            .inspect_err(|e| error!("Failed to get latest logdata. err={:?}", e))?;
129        let res: Vec<fsipc::legacy::Measure> = result
130            .into_iter()
131            .map(fsipc::legacy::Measure::from)
132            .collect();
133        Ok(res)
134    }
135
136    /// Signal sent when a new value is stored
137    #[zbus(signal)]
138    async fn store_signal(
139        ctxt: &SignalContext<'_>,
140        key: &str,
141        value: &str,
142        when: u64,
143    ) -> zbus::Result<()>;
144
145    #[instrument(skip_all)]
146    async fn store(
147        &self,
148        #[zbus(signal_context)] ctxt: SignalContext<'_>,
149        key: &str,
150        value: &str,
151    ) -> Result<(), LogErr> {
152        keys::valid_key(key)?;
153        let when = fxtime();
154        let timefail = self.timefail.is_timefail();
155        self.ds
156            .insert(key, value, when, timefail)
157            .await
158            .inspect_err(|e| error!("Failed to store single metric. err={:?}", e))?;
159
160        #[allow(
161            clippy::cast_precision_loss,
162            clippy::cast_sign_loss,
163            clippy::cast_possible_truncation
164        )]
165        let trunc_when = when as u64;
166
167        if !key.starts_with("modio.") {
168            Self::store_signal(&ctxt, key, value, trunc_when).await?;
169        };
170        Ok(())
171    }
172
173    #[instrument(skip_all)]
174    async fn store_with_time(
175        &mut self,
176        #[zbus(signal_context)] ctxt: SignalContext<'_>,
177        key: &str,
178        value: &str,
179        when: u64,
180    ) -> Result<(), LogErr> {
181        keys::valid_key(key)?;
182        // timefail is always false when storing with timestamp, as the timestamp is expected to be
183        // correct from another source.
184        let timefail = false;
185
186        // if time wraps we have another problem.
187        #[allow(clippy::cast_precision_loss)]
188        let db_when = when as f64;
189
190        self.ds
191            .insert(key, value, db_when, timefail)
192            .await
193            .inspect_err(|e| error!("Failed to store single metric with timestamp. err={:?}", e))?;
194        if !key.starts_with("modio.") {
195            Self::store_signal(&ctxt, key, value, when).await?;
196        };
197        Ok(())
198    }
199    /// Signal sent when a new transaction is added
200    #[zbus(signal)]
201    async fn transaction_added(ctxt: &SignalContext<'_>, key: &str) -> zbus::Result<()>;
202
203    #[instrument(skip_all)]
204    async fn transaction_add(
205        &mut self,
206        #[zbus(signal_context)] ctxt: SignalContext<'_>,
207        key: &str,
208        expected: &str,
209        target: &str,
210        token: &str,
211    ) -> Result<(), LogErr> {
212        keys::valid_key(key)?;
213        keys::valid_token(token)?;
214        if self
215            .ds
216            .has_transaction(token)
217            .await
218            .inspect_err(|e| error!("Failed to check for transaction for key. err={:?}", e))?
219        {
220            warn!(
221                "Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
222                key, token
223            );
224            return Ok(());
225        }
226        self.ds
227            .transaction_add(key, expected, target, token)
228            .await
229            .inspect_err(|e| error!("Failed to add transaction for key. err={:?}", e))?;
230        Self::transaction_added(&ctxt, key).await?;
231        Ok(())
232    }
233
234    #[instrument(skip(self))]
235    async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
236        debug!("Retrieving transactions beginning with {}", prefix);
237        let res = self
238            .ds
239            .transaction_get(prefix)
240            .await
241            .inspect_err(|e| error!("Failed to get transaction for key. err={:?}", e))?;
242
243        let res: Vec<Transaction> = res.into_iter().map(Transaction::from).collect();
244        Ok(res)
245    }
246
247    #[instrument(skip(self))]
248    async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
249        debug!("Marking transaction: t_id={}, failed", t_id);
250        let timefail = self.timefail.is_timefail();
251
252        // if it wraps, that is fine.
253        #[allow(clippy::cast_possible_wrap)]
254        let t_id = t_id as i64;
255        let count = self
256            .ds
257            .transaction_fail(t_id, timefail)
258            .await
259            .inspect_err(|e| error!("Failed to mark transaction as failed. err={:?}", e))?;
260        if count > 0 {
261            Ok(())
262        } else {
263            Err(LogErr::TransactionNotFound)
264        }
265    }
266
267    #[instrument(skip(self))]
268    async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
269        debug!("Marking transaction: t_id={}, passed ", t_id);
270        let timefail = self.timefail.is_timefail();
271        // if it wraps, that is fine.
272        #[allow(clippy::cast_possible_wrap)]
273        let t_id = t_id as i64;
274        let count = self
275            .ds
276            .transaction_pass(t_id, timefail)
277            .await
278            .inspect_err(|e| error!("Failed to mark transaction as passed. err={:?}", e))?;
279        if count > 0 {
280            Ok(())
281        } else {
282            Err(LogErr::TransactionNotFound)
283        }
284    }
285
286    #[instrument(skip(self))]
287    async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
288        prepare_range_check(maximum)?;
289        let data = self
290            .ds
291            .get_batch(maximum)
292            .await
293            .inspect_err(|e| error!("Failed to get batch of datapoints. err={:?}", e))?;
294        let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
295        Ok(result)
296    }
297
298    #[instrument(skip(self))]
299    async fn prepare_modio_datapoints(
300        &mut self,
301        maximum: u32,
302    ) -> Result<Vec<PreparedPoint>, LogErr> {
303        prepare_range_check(maximum)?;
304        let data = self
305            .ds
306            .get_internal_batch(maximum)
307            .await
308            .inspect_err(|e| error!("Failed to get_internal_batch. err={:?}", e))?;
309        let result: Vec<PreparedPoint> = data.into_iter().map(PreparedPoint::from).collect();
310        Ok(result)
311    }
312
313    #[instrument(skip_all)]
314    async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
315        prepare_remove_check(&items)?;
316        self.ds
317            .drop_batch(&items)
318            .await
319            .inspect_err(|e| error!("Failed to delete batch of submitted items. err={:?}", e))?;
320        Ok(())
321    }
322}
323
324/// Check of incoming values to be removed, Returns error on invalid items
325fn prepare_remove_check(items: &[i64]) -> Result<(), PreparedError> {
326    if items.is_empty() {
327        return Err(PreparedError::Empty);
328    }
329    if items.iter().any(|x| *x < 0_i64) {
330        return Err(PreparedError::InvalidIndex);
331    }
332    Ok(())
333}
334
335#[derive(thiserror::Error, Debug)]
336pub enum PreparedError {
337    #[error("Too few items")]
338    TooSmall,
339    #[error("Too many items")]
340    TooMany,
341    #[error("Empty item set")]
342    Empty,
343    #[error("Invalid index")]
344    InvalidIndex,
345}
346
347// Converting From a Prepared Error to LogError (which is used by DBus layer)
348impl From<PreparedError> for LogErr {
349    fn from(e: PreparedError) -> Self {
350        Self::InvalidPrepared(e.to_string())
351    }
352}
353
354/// Range check helper for `prepare_datapoints` and `prepare_modio_datapoints`
355///
356/// Errors:
357///    `PreparedError` with a specific error item.
358const fn prepare_range_check(num: u32) -> Result<(), PreparedError> {
359    match num {
360        0 => Err(PreparedError::TooSmall),
361        1..=250 => Ok(()),
362        _ => Err(PreparedError::TooMany),
363    }
364}
365
366#[cfg(test)]
367pub mod tests {
368    use super::*;
369
370    use crate::testing::{Tempbase, TestServer};
371    use fsipc::unixtime;
372    use std::error::Error;
373    use test_log::test;
374    use timeout_macro::timeouttest;
375
376    type TestResult = Result<(), Box<dyn Error>>;
377
378    use futures_util::{FutureExt, StreamExt};
379
380    #[test(timeouttest)]
381    async fn ping_pong_test() -> TestResult {
382        let server = TestServer::new(line!()).await?;
383        let proxy = server.proxy().await?;
384        let first = proxy.ping().await?;
385        let second = proxy.ping().await?;
386        assert_eq!(first, "Ping? Pong");
387        assert_eq!(second, "Ping? Pong");
388        Ok(())
389    }
390
391    #[test(timeouttest)]
392    async fn done_gives_error_test() -> TestResult {
393        let server = TestServer::new(line!()).await?;
394        let proxy = server.proxy().await?;
395        let res = proxy.done().await;
396        assert!(res.is_err());
397        Ok(())
398    }
399
400    #[test(timeouttest)]
401    async fn store_retrieve() -> TestResult {
402        let server = TestServer::new(line!()).await?;
403        let proxy = server.proxy().await?;
404        proxy.store("test.key", "abc123").await?;
405        let m = proxy.retrieve("test.key").await?;
406        assert_eq!(m.key, "test.key");
407        assert_eq!(m.value, "abc123");
408        Ok(())
409    }
410
411    #[test(timeouttest)]
412    async fn store_buffer() -> TestResult {
413        let server = TestServer::new(line!()).await?;
414        let proxy = server.proxy().await?;
415        proxy
416            .store_with_time("test.key", "abc123", 1_494_602_107)
417            .await?;
418        proxy
419            .store_with_time("test.key", "abc1234", 1_494_602_108)
420            .await?;
421        let m = proxy.retrieve("test.key").await?;
422        assert_eq!(m.key, "test.key");
423        assert_eq!(m.value, "abc1234");
424        assert_eq!(m.timestamp, 1_494_602_108);
425        Ok(())
426    }
427
428    #[test(timeouttest)]
429    async fn retrieve_all_test() -> TestResult {
430        let server = TestServer::new(line!()).await?;
431        let proxy = server.proxy().await?;
432        debug!("I have a party here");
433        proxy
434            .store_with_time("test.key", "abc123", 1_494_602_107)
435            .await?;
436        proxy
437            .store_with_time("test.key", "abc1234", 1_494_602_108)
438            .await?;
439        proxy.store("test.key2", "abcdefg").await?;
440
441        let all = proxy.retrieve_all().await?;
442        assert_eq!(all.len(), 2);
443        let m0 = all.first().expect("Should have value");
444        assert_eq!(m0.key, "test.key");
445        assert_eq!(m0.value, "abc1234");
446        assert_eq!(m0.timestamp, 1_494_602_108);
447        let m1 = all.get(1).expect("Should have value");
448        assert_eq!(m1.key, "test.key2");
449        assert_eq!(m1.value, "abcdefg");
450        Ok(())
451    }
452
453    #[test(timeouttest)]
454    async fn transaction_adding_test() -> TestResult {
455        let server = TestServer::new(line!()).await?;
456        let proxy = server.proxy().await?;
457        proxy
458            .transaction_add("test.test.one", "first", "second", "012")
459            .await?;
460        proxy
461            .transaction_add("dummy.test.one", "should not", "be present", "013")
462            .await?;
463        let transactions = proxy.transaction_get("test.test").await?;
464        assert_eq!(transactions.len(), 1);
465        let res = &transactions[0];
466        assert_eq!(res.key, "test.test.one");
467        assert_eq!(res.t_id, 1, "Transaction ID mismatch");
468        Ok(())
469    }
470
471    #[test(timeouttest)]
472    async fn transaction_dupe_adding_test() -> TestResult {
473        let server = TestServer::new(line!()).await?;
474        let proxy = server.proxy().await?;
475        proxy
476            .transaction_add("test.test.one", "first", "second", "1638290048")
477            .await?;
478        let res = proxy
479            .transaction_add("test.test.one", "first", "second", "1638290048")
480            .await;
481        res.expect("duplicated tokens should not cause error");
482        let transactions = proxy.transaction_get("test.test").await?;
483        assert_eq!(transactions.len(), 1);
484        Ok(())
485    }
486
487    #[test(timeouttest)]
488    async fn transaction_signal_test() -> TestResult {
489        let server = TestServer::new(line!()).await?;
490        let logger = server.proxy().await?;
491        let mut stream = logger.receive_transaction_added().await?;
492        logger
493            .transaction_add("test.test.transaction_signal", "first", "second", "012")
494            .await?;
495
496        let signal = stream.next().await.unwrap();
497        let payload = signal.args()?;
498        assert_eq!(payload.key, "test.test.transaction_signal");
499        Ok(())
500    }
501
502    #[test(timeouttest)]
503    async fn transaction_passing_test() -> TestResult {
504        let server = TestServer::new(line!()).await?;
505        let logger = server.proxy().await?;
506        logger
507            .transaction_add("test.test.one", "first", "second", "012")
508            .await?;
509        logger
510            .transaction_add("test.test.two", "uno", "dos", "0113")
511            .await?;
512        let trans = logger.transaction_get("test.test").await?;
513        logger.transaction_fail(trans[0].t_id).await?;
514        logger.transaction_pass(trans[1].t_id).await?;
515        logger
516            .transaction_add("test.test.three", "etta", "tvåa", "0114")
517            .await?;
518        let transactions = logger.transaction_get("test.test").await?;
519        assert_eq!(transactions.len(), 1);
520        let res = &transactions[0];
521        assert_eq!(res.key, "test.test.three");
522        assert_eq!(res.t_id, 3, "Transaction id mismatch");
523        Ok(())
524    }
525
526    #[test(timeouttest)]
527    async fn retrieving_data_test() -> TestResult {
528        let server = TestServer::new(line!()).await?;
529        let ipc = server.proxy().await?;
530        ipc.store("test.test.one", "first").await?;
531        ipc.store("test.test.one", "second").await?;
532        ipc.store("test.test.one", "third").await?;
533
534        ipc.store("test.test.two", "1").await?;
535        ipc.store("test.test.two", "2").await?;
536        ipc.store("test.test.two", "3").await?;
537
538        let res = ipc.retrieve_all().await?;
539        for measure in &res {
540            let data = ipc.retrieve(&measure.key).await?;
541            assert_eq!(data.key, measure.key);
542        }
543        Ok(())
544    }
545
546    #[test(timeouttest)]
547    async fn valid_key_test() -> TestResult {
548        let server = TestServer::new(line!()).await?;
549        let ipc = server.proxy().await?;
550        assert!(ipc.valid_key("modio.software.development").await?);
551        assert!(ipc.valid_key("abc").await?);
552        assert!(ipc.valid_key("a.b.c").await?);
553        assert!(ipc.valid_key("a_b.c").await?);
554        Ok(())
555    }
556
557    #[test(timeouttest)]
558    async fn invalid_key_test() -> TestResult {
559        let server = TestServer::new(line!()).await?;
560        let ipc = server.proxy().await?;
561        assert!(!ipc.valid_key("modio..invalid").await?);
562        assert!(!ipc.valid_key(".modio..invalid").await?);
563        assert!(!ipc.valid_key("modio.invalid.").await?);
564        assert!(!ipc.valid_key("modio. invalid").await?);
565        assert!(!ipc.valid_key("modio.in valid").await?);
566        assert!(!ipc.valid_key("modio.invalid ").await?);
567        assert!(!ipc.valid_key(" modio.invalid").await?);
568        Ok(())
569    }
570
571    #[test(timeouttest)]
572    async fn transaction_double() -> TestResult {
573        let server = TestServer::new(line!()).await?;
574        let ipc = server.proxy().await?;
575        let key = "test.test.one";
576        let first = "first";
577        let second = "second";
578        let guid4 = zbus::Guid::generate();
579        // Store the key
580        ipc.store(key, first).await?;
581
582        ipc.transaction_add(key, first, second, guid4.as_str())
583            .await?;
584        let transactions = ipc.transaction_get(key).await?;
585        let first_transaction = transactions
586            .first()
587            .expect("Should have at least one transaction");
588        let res = ipc.transaction_pass(first_transaction.t_id).await;
589        assert!(res.is_ok());
590        let res = ipc.transaction_pass(first_transaction.t_id).await;
591        assert!(res.is_err());
592        Ok(())
593    }
594
595    #[test(timeouttest)]
596    async fn transaction_tests() -> TestResult {
597        let server = TestServer::new(line!()).await?;
598        let ipc = server.proxy().await?;
599        let key = "test.test.one";
600        let first = "first";
601        let mut our_value = "first";
602        let second = "second";
603
604        ipc.store(key, our_value).await?;
605        let when = unixtime();
606
607        ipc.store_with_time(key, our_value, when).await?;
608
609        let guid1 = zbus::Guid::generate(); // Transactions "id" are either a UUID or an timestamp-as-string
610        ipc.transaction_add(key, first, second, guid1.as_str())
611            .await?;
612        let guid2 = zbus::Guid::generate();
613        ipc.transaction_add(key, first, second, guid2.as_str())
614            .await?;
615        let guid3 = zbus::Guid::generate();
616        ipc.transaction_add(key, first, second, guid3.as_str())
617            .await?;
618
619        let transactions = ipc.transaction_get(key).await?;
620        // The "id" here is an internal ID in the current logger instance, not the same as
621        // our GUID ID we entered above.
622        for trn in &transactions {
623            if trn.key == key {
624                if our_value == trn.expected {
625                    our_value = &trn.target;
626                    ipc.transaction_pass(trn.t_id).await?;
627                } else {
628                    ipc.transaction_fail(trn.t_id).await?;
629                }
630            }
631            ipc.store(key, our_value).await?;
632        }
633        // Transaction  get should _succeed_ but return empty data.
634        let res = ipc.transaction_get(key).await?;
635        assert_eq!(res.len(), 0);
636
637        Ok(())
638    }
639
640    #[test(timeouttest)]
641    async fn no_modio_signals() -> TestResult {
642        let server = TestServer::new(line!()).await?;
643        let ipc = server.proxy().await?;
644        let mut stream = ipc.receive_store_signal().await?;
645        // Send a transactiona
646        ipc.store("test.test.test", "value").await?;
647        let first = stream.next().await.unwrap();
648        let payload = first.args()?;
649        assert_eq!(payload.key, "test.test.test");
650
651        // A store to a "modio." prefix key should not end up in a signal
652        // Therefore we should have an empty result here.
653        ipc.store("modio.test.test", "value").await?;
654        let second = stream.next().now_or_never();
655        assert!(second.is_none());
656        Ok(())
657    }
658
659    #[test(timeouttest)]
660    async fn submit_consume() -> TestResult {
661        let server = TestServer::new(line!()).await?;
662        let ipc = server.proxy().await?;
663        for x in 0..5 {
664            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
665                .await?;
666        }
667        let vals = ipc.prepare_datapoints(10).await?;
668        assert_eq!(vals.len(), 5);
669
670        let point = &vals[0];
671        assert_eq!(point.key, "test.foo");
672        assert_eq!(point.value, "0");
673
674        let more = ipc.prepare_datapoints(10).await?;
675        // Should be available still
676        assert_eq!(more.len(), 5);
677
678        let last = &vals[4];
679        assert_eq!(last.key, "test.foo");
680        assert_eq!(last.value, "4");
681
682        let to_remove: Vec<_> = vals.iter().map(|m| m.id).collect();
683
684        ipc.remove_prepared(to_remove).await?;
685        let after = ipc.prepare_datapoints(30).await?;
686        assert!(after.is_empty());
687        Ok(())
688    }
689
690    #[test(timeouttest)]
691    async fn submit_modio_consume() -> TestResult {
692        let server = TestServer::new(line!()).await?;
693        let ipc = server.proxy().await?;
694        for x in 0..5 {
695            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 100)
696                .await?;
697            ipc.store_with_time("modio.test.foo", &x.to_string(), unixtime() - 100)
698                .await?;
699        }
700
701        let vals = ipc.prepare_modio_datapoints(20).await?;
702        assert_eq!(vals.len(), 5);
703        for point in &vals {
704            assert_eq!(&point.key, "modio.test.foo");
705        }
706
707        let more = ipc.prepare_modio_datapoints(10).await?;
708        // Should be available still
709        assert_eq!(more.len(), 5);
710
711        // Figure out how to remove the ID's here
712        //
713        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
714        ipc.remove_prepared(to_remove).await?;
715
716        let modio_after = ipc.prepare_modio_datapoints(30).await?;
717        assert!(modio_after.is_empty());
718
719        let after = ipc.prepare_datapoints(30).await?;
720        assert!(!after.is_empty());
721        Ok(())
722    }
723
724    // Prepare datapoints should cause a write to disk only if the buffer is eitheer full or too
725    // old, as for how the database considers things.
726    #[test(timeouttest)]
727    async fn test_get_batch_expiry() -> TestResult {
728        let server = TestServer::new(line!()).await?;
729        let ipc = server.proxy().await?;
730        for x in 0..25 {
731            ipc.store("test.foo", &x.to_string()).await?;
732        }
733        let vals = ipc.prepare_datapoints(10).await?;
734        assert_eq!(
735            vals.len(),
736            0,
737            "Data should not automatically be flushed to disk"
738        );
739
740        ipc.store_with_time("test.foo", "26", unixtime() - 200)
741            .await?;
742        let vals = ipc.prepare_datapoints(10).await?;
743        assert_eq!(
744            vals.len(),
745            10,
746            "Oldest data in buffer should cause a flush to disk"
747        );
748        Ok(())
749    }
750
751    // Prepare (internal) datapoints should cause a write to disk only if the buffer is either full
752    // or too old, as for how the database considers things.
753    #[test(timeouttest)]
754    async fn test_internal_get_batch_expiry() -> TestResult {
755        let server = TestServer::new(line!()).await?;
756        let ipc = server.proxy().await?;
757        for x in 0..25 {
758            ipc.store("modio.test.foo", &x.to_string()).await?;
759            ipc.store("test.foo", &x.to_string()).await?;
760        }
761        let vals = ipc.prepare_modio_datapoints(10).await?;
762        assert_eq!(
763            vals.len(),
764            0,
765            "Data should not automatically be flushed to disk"
766        );
767
768        ipc.store_with_time("modio.test.foo", "26", unixtime() - 200)
769            .await?;
770        let vals = ipc.prepare_modio_datapoints(10).await?;
771        assert_eq!(
772            vals.len(),
773            10,
774            "Oldest data in buffer should cause a flush to disk"
775        );
776        Ok(())
777    }
778
779    #[test(timeouttest)]
780    async fn test_get_batch() -> TestResult {
781        let server = TestServer::new(line!()).await?;
782        let ipc = server.proxy().await?;
783        for x in 0..25 {
784            ipc.store_with_time("test.foo", &x.to_string(), unixtime() - 200)
785                .await?;
786        }
787
788        let vals = ipc.prepare_datapoints(10).await?;
789        assert_eq!(vals.len(), 10);
790        let point = &vals[0];
791        assert_eq!(point.key, "test.foo");
792        assert_eq!(point.value, "0");
793
794        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
795        ipc.remove_prepared(to_remove).await?;
796
797        // Requst more than we have left, should return only the 25-10=15 items
798        let vals = ipc.prepare_datapoints(30).await?;
799        assert_eq!(vals.len(), 15);
800        let point = &vals[0];
801        assert_eq!(point.key, "test.foo");
802        assert_eq!(point.value, "10");
803
804        let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
805        ipc.remove_prepared(to_remove).await?;
806
807        let vals = ipc.prepare_datapoints(30).await?;
808        assert!(vals.is_empty());
809        Ok(())
810    }
811
812    #[test(timeouttest)]
813    /// Technically, this test might fail if you have an ethernet interface named "wan"
814    /// I'm ok with that for now as that is unlikely to happen, if it does, disable the test, or
815    /// check it as something better
816    async fn test_get_mac() -> TestResult {
817        let server = TestServer::new(line!()).await?;
818        let logger = server.proxy().await?;
819        let res = logger.get_boxid().await?;
820        assert_eq!(res, "000000000000");
821        Ok(())
822    }
823
824    #[test(timeouttest)]
825    async fn test_empty_transactions() -> TestResult {
826        let server = TestServer::new(line!()).await?;
827        let ipc = server.proxy().await?;
828        let transactions = ipc.transaction_get("test").await?;
829        assert_eq!(transactions.len(), 0);
830        let values = ipc.retrieve_all().await?;
831        assert_eq!(values.len(), 0);
832        Ok(())
833    }
834
835    #[test(timeouttest)]
836    async fn test_transaction_get_prefix() -> TestResult {
837        let server = TestServer::new(line!()).await?;
838        let ipc = server.proxy().await?;
839        let transactions = ipc.transaction_get("mbus.").await?;
840        assert_eq!(transactions.len(), 0);
841        Ok(())
842    }
843
844    // This test is speshul.
845    // It dupes the logic for the temp server in order to start it several times and compare
846    // results between executions, in order to make sure that temp data (ram only) is accessible
847    // later.
848    #[test(timeouttest)]
849    async fn resume_database_test() -> TestResult {
850        let base = Tempbase::default();
851        let dbfile = Tempbase::dbfile();
852        let name = format!("se.modio.logger.TestResumeDb{}", line!());
853
854        let expected = vec![
855            ("test.test.one", "1"),
856            ("test.test.two", "2"),
857            ("test.test.three", "3"),
858            ("test.test.four", "4"),
859            ("test.test.five", "5"),
860            ("test.test.six", "6"),
861        ];
862
863        // Spawn server, store some data and then shut it down
864        {
865            let name = name.clone();
866            let base = base.clone();
867            let dbfile = dbfile.clone();
868            let server = TestServer::new_with_base(name, base, dbfile).await?;
869            let logger = server.proxy().await?;
870            info!("Filling datastore");
871            for (key, val) in &expected {
872                logger.store(key, val).await?;
873            }
874            // Read the data to ensure we have it in storage
875            for (key, value) in &expected {
876                let data = logger.retrieve(key).await?;
877                assert_eq!(&data.value, value);
878                assert_eq!(&data.key, key);
879            }
880            info!("Done, settling things");
881        };
882
883        // Spawn another server, retrieving the previous data and ensure it is the same.
884        {
885            let server = TestServer::new_with_base(name, base, dbfile).await?;
886            let logger = server.proxy().await?;
887            info!("Checking content after re-start");
888            for (key, value) in &expected {
889                let data = logger.retrieve(key).await?;
890                assert_eq!(&data.value, value);
891                assert_eq!(&data.key, key);
892            }
893        }
894        Ok(())
895    }
896
897    // This is a big test-case that covers multiple concurrent reader-writers while running
898    // maintenance jobs at the same time.
899    // The test triggers load problems with the system if there are too few DB connections, and
900    // more.
901    #[allow(clippy::assertions_on_constants)]
902    #[test(timeouttest)]
903    async fn load_and_maintain() -> TestResult {
904        let server = TestServer::new(line!()).await?;
905        let ipc = server.proxy().await?;
906        let logger1 = server.logger1().await?;
907        let submit1 = server.submit1().await?;
908        use crate::testing::proxytest::{Logger1Proxy, Submit1Proxy};
909        use crate::LOGGER_PATH;
910        use async_std::task::{sleep, spawn};
911        use fsipc::legacy::fsipcProxy;
912        use futures::try_join;
913        use std::collections::HashMap;
914        use std::sync::atomic::{AtomicBool, Ordering};
915        use std::sync::Arc;
916        use std::time::Duration;
917
918        // Atomic flag that the tasks check before writing, prevents them from causing trouble once
919        // we reach shutdown
920        let stop = Arc::new(AtomicBool::new(false));
921        // Amount of keys to have traffic for that are using "one write per key" mode.
922        const NUM_SLOW_KEYS: usize = 1000;
923        const NUM_TRANSACTIONS: usize = 50;
924        assert!(
925            NUM_TRANSACTIONS < NUM_SLOW_KEYS,
926            "Transactions work on slow keys."
927        );
928        const NUM_BULK_KEYS: usize = 2000;
929        // How many items the submitter will consume each loop
930        const NUM_SUBMIT_DATA: usize = 200;
931
932        // How many loops to run for each of maintenance / clean until we are done
933        // The test will end after either of these reach their limit, or something fails.
934        const NUM_CLEAN_LOOPS: usize = 50;
935        const NUM_MAINT_LOOPS: usize = 50;
936
937        // 1 task to generate "trickle" of data, using ipc.store with delays.
938        //  ( also sets metadata for each iteration )
939        async fn trickle_gen(
940            stop: Arc<AtomicBool>,
941            ipc: fsipcProxy<'static>,
942            logger1: Logger1Proxy<'static>,
943        ) -> zbus::Result<()> {
944            loop {
945                for n in 0..NUM_SLOW_KEYS {
946                    let key = format!("test.trickle.key.{n}");
947                    let name = format!("Name of tricke test key #{n}");
948                    let desc = format!("Description {n} of trickle test key #{n}");
949                    let val = format!("{n}");
950                    logger1.set_metadata_name(&key, &name).await?;
951                    logger1.set_metadata_description(&key, &desc).await?;
952                    ipc.store(&key, &val).await?;
953                }
954                if stop.load(Ordering::Relaxed) {
955                    break Ok(());
956                }
957            }
958        }
959        // 1 task to generate "bulk" of data, using logger1.store func
960        async fn bulk_gen(
961            stop: Arc<AtomicBool>,
962            logger1: Logger1Proxy<'static>,
963        ) -> zbus::Result<()> {
964            let delay = Duration::from_millis(1);
965            loop {
966                let mut map = HashMap::with_capacity(NUM_BULK_KEYS);
967                for n in 0..=NUM_BULK_KEYS {
968                    let key = format!("test.bulk.key.{n}");
969                    let val = format!("{n}");
970                    map.insert(key, val.into());
971                }
972                logger1.store_batch(map).await?;
973                sleep(delay).await;
974                if stop.load(Ordering::Relaxed) {
975                    break Ok(());
976                }
977            }
978        }
979
980        // 1 task to pretend to submit, calling "get all metadata" and then "prepare batch"
981        async fn submit_gen(
982            stop: Arc<AtomicBool>,
983            ipc: fsipcProxy<'static>,
984            submit1: Submit1Proxy<'static>,
985        ) -> zbus::Result<()> {
986            let delay = Duration::from_millis(5);
987            loop {
988                if stop.load(Ordering::Relaxed) {
989                    break Ok(());
990                }
991                let _meta = submit1.get_all_metadata().await?;
992                let dp = ipc.prepare_datapoints(NUM_SUBMIT_DATA as u32).await?;
993                let to_remove: Vec<_> = dp.iter().map(|m| m.id).collect();
994                if to_remove.is_empty() {
995                    sleep(delay).await;
996                } else {
997                    ipc.remove_prepared(to_remove).await?;
998                }
999            }
1000        }
1001
1002        // 1 task to poll keys and add transactions
1003        async fn tran_gen(stop: Arc<AtomicBool>, ipc: fsipcProxy<'static>) -> zbus::Result<()> {
1004            let delay = Duration::from_millis(5);
1005            let mut t_id = 0;
1006            loop {
1007                // Sleep first to give the trickle code a chance to add measures. Doing it that way
1008                // cuts down on errors.
1009                sleep(delay).await;
1010                if stop.load(Ordering::Relaxed) {
1011                    break Ok(());
1012                }
1013                for n in 0..=NUM_TRANSACTIONS {
1014                    t_id += 1;
1015                    let transaction_id = format!("{t_id}");
1016                    let key = format!("test.trickle.key.{n}");
1017                    // No datapoint exists yet, that is not a problem for this code
1018                    if let Ok(val) = ipc.retrieve(&key).await {
1019                        ipc.transaction_add(&val.key, &val.value, "0", &transaction_id)
1020                            .await?;
1021                    }
1022                }
1023            }
1024        }
1025
1026        async fn maint_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1027            let delay = Duration::from_millis(5);
1028            use crate::LOGGER_PATH;
1029            let iface_ref = conn
1030                .object_server()
1031                .interface::<_, Logger>(LOGGER_PATH)
1032                .await?;
1033            for _ in 0..=NUM_MAINT_LOOPS {
1034                sleep(delay).await;
1035                if stop.load(Ordering::Relaxed) {
1036                    return Ok(());
1037                }
1038                iface_ref
1039                    .get_mut()
1040                    .await
1041                    .periodic()
1042                    .await
1043                    // Since the proxies return a zbus::Error<()> and we want to use try_join below, we
1044                    // need to return the same kind of error, and I don't feel like implementing the
1045                    // conversions just for some test-cases.
1046                    .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1047            }
1048            stop.store(true, Ordering::Relaxed);
1049            Ok(())
1050        }
1051
1052        async fn clean_task(stop: Arc<AtomicBool>, conn: zbus::Connection) -> zbus::Result<()> {
1053            let delay = Duration::from_millis(5);
1054            use crate::LOGGER_PATH;
1055            let iface_ref = conn
1056                .object_server()
1057                .interface::<_, Logger>(LOGGER_PATH)
1058                .await?;
1059            let pool = iface_ref.get_mut().await.ds().pool();
1060            for _ in 0..=NUM_CLEAN_LOOPS {
1061                sleep(delay).await;
1062                if stop.load(Ordering::Relaxed) {
1063                    return Ok(());
1064                }
1065                Datastore::clean_maintenance(pool.clone())
1066                    .await
1067                    .map_err(|e| zbus::Error::Failure(e.to_string()))?;
1068            }
1069            stop.store(true, Ordering::Relaxed);
1070            Ok(())
1071        }
1072
1073        let trickle = spawn(trickle_gen(stop.clone(), ipc.clone(), logger1.clone()));
1074        let bulk = spawn(bulk_gen(stop.clone(), logger1.clone()));
1075        let submit = spawn(submit_gen(stop.clone(), ipc.clone(), submit1.clone()));
1076        let trans = spawn(tran_gen(stop.clone(), ipc.clone()));
1077        let maint = spawn(maint_task(stop.clone(), server.conn.clone()));
1078        let cleant = spawn(clean_task(stop.clone(), server.conn.clone()));
1079        let iface_ref = server
1080            .conn
1081            .object_server()
1082            .interface::<_, Logger>(LOGGER_PATH)
1083            .await?;
1084        /* Artificially  drain a few connetions from our pool. default pool is 4, so we steal 2
1085         * more. 1 for maintenance and one for the pool. */
1086        let _conn4 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1087        let _conn5 = iface_ref.get_mut().await.ds().pool().acquire().await?;
1088        try_join!(trickle, bulk, submit, trans, maint, cleant)?;
1089        stop.store(true, Ordering::Relaxed);
1090
1091        Ok(())
1092    }
1093}