Skip to main content

whatsapp_rust/store/
persistence_manager.rs

1use super::error::{StoreError, db_err};
2use crate::store::Device;
3use crate::store::traits::Backend;
4use async_lock::RwLock;
5use event_listener::Event;
6use futures::FutureExt;
7use log::{debug, error};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11use wacore::runtime::Runtime;
12use wacore_binary::jid::Jid;
13
14pub struct PersistenceManager {
15    device: Arc<RwLock<Device>>,
16    backend: Arc<dyn Backend>,
17    dirty: Arc<AtomicBool>,
18    save_notify: Arc<Event>,
19}
20
21impl PersistenceManager {
22    /// Create a PersistenceManager with a backend implementation.
23    ///
24    /// Note: The backend should already be configured with the correct device_id
25    /// (via SqliteStore::new_for_device for multi-account scenarios).
26    pub async fn new(backend: Arc<dyn Backend>) -> Result<Self, StoreError> {
27        debug!("PersistenceManager: Ensuring device row exists.");
28        // Ensure a device row exists for this backend's device_id; create it if not.
29        let exists = backend.exists().await.map_err(db_err)?;
30        if !exists {
31            debug!("PersistenceManager: No device row found. Creating new device row.");
32            let id = backend.create().await.map_err(db_err)?;
33            debug!("PersistenceManager: Created device row with id={id}.");
34        }
35
36        debug!("PersistenceManager: Attempting to load device data via Backend.");
37        let device_data_opt = backend.load().await.map_err(db_err)?;
38
39        let device = if let Some(serializable_device) = device_data_opt {
40            debug!(
41                "PersistenceManager: Loaded existing device data (PushName: '{}'). Initializing Device.",
42                serializable_device.push_name
43            );
44            let mut dev = Device::new(backend.clone());
45            dev.load_from_serializable(serializable_device);
46            dev
47        } else {
48            debug!("PersistenceManager: No data yet; initializing default Device in memory.");
49            Device::new(backend.clone())
50        };
51
52        Ok(Self {
53            device: Arc::new(RwLock::new(device)),
54            backend,
55            dirty: Arc::new(AtomicBool::new(false)),
56            save_notify: Arc::new(Event::new()),
57        })
58    }
59
60    pub async fn get_device_arc(&self) -> Arc<RwLock<Device>> {
61        self.device.clone()
62    }
63
64    pub async fn get_device_snapshot(&self) -> Device {
65        self.device.read().await.clone()
66    }
67
68    pub fn backend(&self) -> Arc<dyn Backend> {
69        self.backend.clone()
70    }
71
72    pub async fn modify_device<F, R>(&self, modifier: F) -> R
73    where
74        F: FnOnce(&mut Device) -> R,
75    {
76        let mut device_guard = self.device.write().await;
77        let result = modifier(&mut device_guard);
78
79        self.dirty.store(true, Ordering::Relaxed);
80        self.save_notify.notify(1);
81
82        result
83    }
84
85    /// Flush any dirty device state to the backend immediately.
86    pub async fn flush(&self) -> Result<(), StoreError> {
87        self.save_to_disk().await
88    }
89
90    async fn save_to_disk(&self) -> Result<(), StoreError> {
91        if self.dirty.swap(false, Ordering::AcqRel) {
92            debug!("Device state is dirty, saving to disk.");
93            let device_guard = self.device.read().await;
94            let serializable_device = device_guard.to_serializable();
95            drop(device_guard);
96
97            if let Err(e) = self.backend.save(&serializable_device).await {
98                // Restore dirty flag so the next tick retries the save
99                self.dirty.store(true, Ordering::Release);
100                return Err(db_err(e));
101            }
102            debug!("Device state saved successfully.");
103        }
104        Ok(())
105    }
106
107    /// Triggers a snapshot of the underlying storage backend.
108    /// Useful for debugging critical errors like crypto state corruption.
109    pub async fn create_snapshot(
110        &self,
111        name: &str,
112        extra_content: Option<&[u8]>,
113    ) -> Result<(), StoreError> {
114        #[cfg(feature = "debug-snapshots")]
115        {
116            // Ensure pending changes are saved first
117            self.save_to_disk().await?;
118            self.backend
119                .snapshot_db(name, extra_content)
120                .await
121                .map_err(db_err)
122        }
123        #[cfg(not(feature = "debug-snapshots"))]
124        {
125            let _ = name;
126            let _ = extra_content;
127            log::warn!("Snapshot requested but 'debug-snapshots' feature is disabled");
128            Ok(())
129        }
130    }
131
132    pub fn run_background_saver(self: Arc<Self>, runtime: Arc<dyn Runtime>, interval: Duration) {
133        let rt = runtime.clone();
134        let weak = Arc::downgrade(&self);
135        drop(self); // Release the strong reference; the caller's Arc keeps it alive
136        runtime
137            .spawn(Box::pin(async move {
138                loop {
139                    let Some(this) = weak.upgrade() else {
140                        debug!("PersistenceManager dropped, exiting background saver.");
141                        return;
142                    };
143                    // Create the listener BEFORE the event can fire to avoid missing notifications.
144                    let listener = this.save_notify.listen();
145                    drop(this); // Don't hold strong ref while sleeping
146
147                    futures::select! {
148                        _ = listener.fuse() => {
149                            debug!("Save notification received.");
150                        }
151                        _ = rt.sleep(interval).fuse() => {}
152                    }
153
154                    let Some(this) = weak.upgrade() else {
155                        debug!("PersistenceManager dropped, exiting background saver.");
156                        return;
157                    };
158                    if let Err(e) = this.save_to_disk().await {
159                        error!("Error saving device state in background: {e}");
160                    }
161                }
162            }))
163            .detach();
164        debug!("Background saver task started with interval {interval:?}");
165    }
166}
167
168use super::commands::{DeviceCommand, apply_command_to_device};
169
170impl PersistenceManager {
171    pub async fn process_command(&self, command: DeviceCommand) {
172        self.modify_device(|device| {
173            apply_command_to_device(device, command);
174        })
175        .await;
176    }
177}
178
179impl PersistenceManager {
180    pub async fn get_skdm_recipients(&self, group_jid: &str) -> Result<Vec<Jid>, StoreError> {
181        self.backend
182            .get_skdm_recipients(group_jid)
183            .await
184            .map_err(db_err)
185    }
186
187    pub async fn add_skdm_recipients(
188        &self,
189        group_jid: &str,
190        device_jids: &[Jid],
191    ) -> Result<(), StoreError> {
192        self.backend
193            .add_skdm_recipients(group_jid, device_jids)
194            .await
195            .map_err(db_err)
196    }
197
198    pub async fn clear_skdm_recipients(&self, group_jid: &str) -> Result<(), StoreError> {
199        self.backend
200            .clear_skdm_recipients(group_jid)
201            .await
202            .map_err(db_err)
203    }
204}