whatsapp_rust/store/
persistence_manager.rs1use super::error::{StoreError, db_err};
2use crate::store::Device;
3use crate::store::traits::Backend;
4use async_lock::RwLock;
5use event_listener::Event;
6use futures::FutureExt;
7use log::{debug, error};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11use wacore::runtime::Runtime;
12use wacore_binary::jid::Jid;
13
14pub struct PersistenceManager {
15 device: Arc<RwLock<Device>>,
16 backend: Arc<dyn Backend>,
17 dirty: Arc<AtomicBool>,
18 save_notify: Arc<Event>,
19}
20
21impl PersistenceManager {
22 pub async fn new(backend: Arc<dyn Backend>) -> Result<Self, StoreError> {
27 debug!("PersistenceManager: Ensuring device row exists.");
28 let exists = backend.exists().await.map_err(db_err)?;
30 if !exists {
31 debug!("PersistenceManager: No device row found. Creating new device row.");
32 let id = backend.create().await.map_err(db_err)?;
33 debug!("PersistenceManager: Created device row with id={id}.");
34 }
35
36 debug!("PersistenceManager: Attempting to load device data via Backend.");
37 let device_data_opt = backend.load().await.map_err(db_err)?;
38
39 let device = if let Some(serializable_device) = device_data_opt {
40 debug!(
41 "PersistenceManager: Loaded existing device data (PushName: '{}'). Initializing Device.",
42 serializable_device.push_name
43 );
44 let mut dev = Device::new(backend.clone());
45 dev.load_from_serializable(serializable_device);
46 dev
47 } else {
48 debug!("PersistenceManager: No data yet; initializing default Device in memory.");
49 Device::new(backend.clone())
50 };
51
52 Ok(Self {
53 device: Arc::new(RwLock::new(device)),
54 backend,
55 dirty: Arc::new(AtomicBool::new(false)),
56 save_notify: Arc::new(Event::new()),
57 })
58 }
59
60 pub async fn get_device_arc(&self) -> Arc<RwLock<Device>> {
61 self.device.clone()
62 }
63
64 pub async fn get_device_snapshot(&self) -> Device {
65 self.device.read().await.clone()
66 }
67
68 pub fn backend(&self) -> Arc<dyn Backend> {
69 self.backend.clone()
70 }
71
72 pub async fn modify_device<F, R>(&self, modifier: F) -> R
73 where
74 F: FnOnce(&mut Device) -> R,
75 {
76 let mut device_guard = self.device.write().await;
77 let result = modifier(&mut device_guard);
78
79 self.dirty.store(true, Ordering::Relaxed);
80 self.save_notify.notify(1);
81
82 result
83 }
84
85 pub async fn flush(&self) -> Result<(), StoreError> {
87 self.save_to_disk().await
88 }
89
90 async fn save_to_disk(&self) -> Result<(), StoreError> {
91 if self.dirty.swap(false, Ordering::AcqRel) {
92 debug!("Device state is dirty, saving to disk.");
93 let device_guard = self.device.read().await;
94 let serializable_device = device_guard.to_serializable();
95 drop(device_guard);
96
97 if let Err(e) = self.backend.save(&serializable_device).await {
98 self.dirty.store(true, Ordering::Release);
100 return Err(db_err(e));
101 }
102 debug!("Device state saved successfully.");
103 }
104 Ok(())
105 }
106
107 pub async fn create_snapshot(
110 &self,
111 name: &str,
112 extra_content: Option<&[u8]>,
113 ) -> Result<(), StoreError> {
114 #[cfg(feature = "debug-snapshots")]
115 {
116 self.save_to_disk().await?;
118 self.backend
119 .snapshot_db(name, extra_content)
120 .await
121 .map_err(db_err)
122 }
123 #[cfg(not(feature = "debug-snapshots"))]
124 {
125 let _ = name;
126 let _ = extra_content;
127 log::warn!("Snapshot requested but 'debug-snapshots' feature is disabled");
128 Ok(())
129 }
130 }
131
132 pub fn run_background_saver(self: Arc<Self>, runtime: Arc<dyn Runtime>, interval: Duration) {
133 let rt = runtime.clone();
134 let weak = Arc::downgrade(&self);
135 drop(self); runtime
137 .spawn(Box::pin(async move {
138 loop {
139 let Some(this) = weak.upgrade() else {
140 debug!("PersistenceManager dropped, exiting background saver.");
141 return;
142 };
143 let listener = this.save_notify.listen();
145 drop(this); futures::select! {
148 _ = listener.fuse() => {
149 debug!("Save notification received.");
150 }
151 _ = rt.sleep(interval).fuse() => {}
152 }
153
154 let Some(this) = weak.upgrade() else {
155 debug!("PersistenceManager dropped, exiting background saver.");
156 return;
157 };
158 if let Err(e) = this.save_to_disk().await {
159 error!("Error saving device state in background: {e}");
160 }
161 }
162 }))
163 .detach();
164 debug!("Background saver task started with interval {interval:?}");
165 }
166}
167
168use super::commands::{DeviceCommand, apply_command_to_device};
169
170impl PersistenceManager {
171 pub async fn process_command(&self, command: DeviceCommand) {
172 self.modify_device(|device| {
173 apply_command_to_device(device, command);
174 })
175 .await;
176 }
177}
178
179impl PersistenceManager {
180 pub async fn get_skdm_recipients(&self, group_jid: &str) -> Result<Vec<Jid>, StoreError> {
181 self.backend
182 .get_skdm_recipients(group_jid)
183 .await
184 .map_err(db_err)
185 }
186
187 pub async fn add_skdm_recipients(
188 &self,
189 group_jid: &str,
190 device_jids: &[Jid],
191 ) -> Result<(), StoreError> {
192 self.backend
193 .add_skdm_recipients(group_jid, device_jids)
194 .await
195 .map_err(db_err)
196 }
197
198 pub async fn clear_skdm_recipients(&self, group_jid: &str) -> Result<(), StoreError> {
199 self.backend
200 .clear_skdm_recipients(group_jid)
201 .await
202 .map_err(db_err)
203 }
204}