memfault_ssf/
shared_service_thread.rs

1//
2// Copyright (c) Memfault, Inc.
3// See License.txt for details
4use std::{
5    any::TypeId,
6    borrow::BorrowMut,
7    sync::{mpsc::Receiver, Arc, Mutex},
8    thread::spawn,
9};
10
11use crate::{Envelope, Mailbox, Service, ShutdownServiceMessage, StatsAggregator};
12
13/// This runs a service into a thread but, unlike `ServiceThread`, it will use
14/// an `Arc<Mutex<S>>` so that the service object is also available as shared
15/// memory.
16///
17/// This is mostly here for backwards compatibility and to make adoption easier:
18/// start by using `SharedServiceThread` and when all usage of the shared memory
19/// have been removed, switch to `ServiceThread` for a "pure actor".
20pub struct SharedServiceThread<S: Service> {
21    mailbox: Mailbox<S>,
22    service: Arc<Mutex<S>>,
23}
24
25impl<S: Service + Send + 'static> SharedServiceThread<S> {
26    pub fn spawn_with(service: S) -> Self {
27        let (mailbox, receiver) = Mailbox::create();
28        let shared_service = Arc::new(Mutex::new(service));
29        {
30            let shared_service = shared_service.clone();
31            let _handle = spawn(move || SharedServiceThread::run(shared_service, receiver));
32        }
33
34        SharedServiceThread {
35            /* handle, */ mailbox,
36            service: shared_service,
37        }
38    }
39
40    pub fn mbox(&self) -> Mailbox<S> {
41        self.mailbox.clone()
42    }
43
44    pub fn run(
45        service: Arc<Mutex<S>>,
46        receiver: Receiver<Envelope<S>>,
47    ) -> Result<StatsAggregator, &'static str> {
48        let mut stats_aggregator = StatsAggregator::new();
49        for mut envelope in receiver {
50            let type_id = envelope.message_type_id();
51            match service.lock().borrow_mut() {
52                Ok(service) => match envelope.deliver_to(service) {
53                    Err(_e) => {
54                        // Delivery failed - probably "attempt to deliver twice" - should never happen.
55                        return Err("delivery failed");
56                    }
57                    Ok(stats) => {
58                        stats_aggregator.add(&stats);
59                    }
60                },
61                Err(_) => {
62                    return Err("Shared mutex is poisoned. Shutting down.");
63                }
64            }
65            if type_id == Some(TypeId::of::<ShutdownServiceMessage>()) {
66                break;
67            }
68        }
69        Ok(stats_aggregator)
70    }
71
72    pub fn shared(&self) -> Arc<Mutex<S>> {
73        self.service.clone()
74    }
75}