1use crate::client::ThingsCloudClient;
2use crate::store::{fold_item, RawState};
3use crate::wire::wire_object::WireItem;
4use anyhow::{anyhow, Context, Result};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11#[derive(Debug, Clone, Default)]
12struct SyncSnapshot {
13 history_key: Option<String>,
14 head_index: i64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18struct CursorData {
19 next_start_index: i64,
20 history_key: String,
21 #[serde(default)]
22 head_index: i64,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, Default)]
26struct StateCacheData {
27 #[serde(default)]
28 version: u8,
29 log_offset: u64,
30 state: RawState,
31}
32
33const STATE_CACHE_VERSION: u8 = 2;
34
35fn read_cursor(path: &Path) -> CursorData {
36 if !path.exists() {
37 return CursorData::default();
38 }
39 let Ok(raw) = fs::read_to_string(path) else {
40 return CursorData::default();
41 };
42 serde_json::from_str(&raw).unwrap_or_default()
43}
44
45fn write_cursor(
46 path: &Path,
47 next_start_index: i64,
48 history_key: &str,
49 head_index: i64,
50) -> Result<()> {
51 let payload = serde_json::to_string(&serde_json::json!({
52 "next_start_index": next_start_index,
53 "history_key": history_key,
54 "head_index": head_index,
55 "updated_at": crate::client::now_timestamp(),
56 }))?;
57 let tmp = path.with_extension("tmp");
58 fs::write(&tmp, payload)?;
59 fs::rename(tmp, path)?;
60 Ok(())
61}
62
63pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
64 fs::create_dir_all(cache_dir)?;
65 let log_path = cache_dir.join("things.log");
66 let cursor_path = cache_dir.join("cursor.json");
67
68 let cursor = read_cursor(&cursor_path);
69 let mut start_index = cursor.next_start_index;
70
71 if client.history_key.is_none() {
72 if !cursor.history_key.is_empty() {
73 client.history_key = Some(cursor.history_key.clone());
74 } else {
75 let _ = client.authenticate()?;
76 }
77 }
78
79 let mut fp = OpenOptions::new()
80 .create(true)
81 .append(true)
82 .open(&log_path)
83 .with_context(|| format!("failed to open {}", log_path.display()))?;
84
85 loop {
86 let page = match client.get_items_page(start_index) {
87 Ok(v) => v,
88 Err(_) => {
89 let _ = client.authenticate()?;
90 client.get_items_page(start_index)?
91 }
92 };
93
94 let items = page
95 .get("items")
96 .and_then(Value::as_array)
97 .cloned()
98 .unwrap_or_default();
99 let end = page
100 .get("end-total-content-size")
101 .and_then(Value::as_i64)
102 .unwrap_or(0);
103 let latest = page
104 .get("latest-total-content-size")
105 .and_then(Value::as_i64)
106 .unwrap_or(0);
107 client.head_index = page
108 .get("current-item-index")
109 .and_then(Value::as_i64)
110 .unwrap_or(client.head_index);
111
112 for item in &items {
113 writeln!(fp, "{}", serde_json::to_string(item)?)?;
114 }
115
116 if !items.is_empty() {
117 fp.flush()?;
118 start_index += items.len() as i64;
119 write_cursor(
120 &cursor_path,
121 start_index,
122 client.history_key.as_deref().unwrap_or_default(),
123 client.head_index,
124 )?;
125 }
126
127 if items.is_empty() || end >= latest {
128 break;
129 }
130 }
131
132 let current_history_key = client.history_key.clone().unwrap_or_default();
133 if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
134 write_cursor(
135 &cursor_path,
136 start_index,
137 ¤t_history_key,
138 client.head_index,
139 )?;
140 }
141
142 Ok(())
143}
144
145fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
146 let path = cache_dir.join("state_cache.json");
147 if !path.exists() {
148 return (RawState::new(), 0);
149 }
150 let Ok(raw) = fs::read_to_string(&path) else {
151 return (RawState::new(), 0);
152 };
153 let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
154 return (RawState::new(), 0);
155 };
156
157 if cache.version != STATE_CACHE_VERSION {
158 return (RawState::new(), 0);
159 }
160
161 (cache.state, cache.log_offset)
162}
163
164fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
165 let path = cache_dir.join("state_cache.json");
166 let payload = serde_json::to_string(&StateCacheData {
167 version: STATE_CACHE_VERSION,
168 log_offset,
169 state: state.clone(),
170 })?;
171 let tmp = path.with_extension("tmp");
172 fs::write(&tmp, payload)?;
173 fs::rename(tmp, path)?;
174 Ok(())
175}
176
177pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
178 let log_path = cache_dir.join("things.log");
179 if !log_path.exists() {
180 return Ok(RawState::new());
181 }
182
183 let (mut state, byte_offset) = read_state_cache(cache_dir);
184 let mut new_lines = 0u64;
185
186 let mut file =
187 File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
188 file.seek(SeekFrom::Start(byte_offset))?;
189 let mut reader = BufReader::new(file);
190 let mut line = String::new();
191 let mut safe_offset = byte_offset;
192
193 loop {
194 line.clear();
195 let read = reader.read_line(&mut line)?;
196 if read == 0 {
197 break;
198 }
199
200 if !line.ends_with('\n') {
201 break;
202 }
203
204 let stripped = line.trim();
205 if stripped.is_empty() {
206 safe_offset = reader.stream_position()?;
207 continue;
208 }
209 let item: WireItem = serde_json::from_str(stripped)
210 .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
211 fold_item(item, &mut state);
212 new_lines += 1;
213 safe_offset = reader.stream_position()?;
214 }
215
216 if new_lines > 0 {
217 write_state_cache(cache_dir, &state, safe_offset)?;
218 }
219
220 Ok(state)
221}
222
223pub fn get_state_with_append_log(
224 client: &mut ThingsCloudClient,
225 cache_dir: PathBuf,
226) -> Result<RawState> {
227 let mut sync_client = client.clone();
228 let sync_cache_dir = cache_dir.clone();
229
230 let sync_worker = std::thread::spawn(move || -> Result<SyncSnapshot> {
231 sync_append_log(&mut sync_client, &sync_cache_dir)?;
232 Ok(SyncSnapshot {
233 history_key: sync_client.history_key,
234 head_index: sync_client.head_index,
235 })
236 });
237
238 let _stale_state = fold_state_from_append_log(&cache_dir)?;
239
240 let sync_snapshot = sync_worker
241 .join()
242 .map_err(|_| anyhow!("sync worker panicked"))??;
243
244 client.history_key = sync_snapshot.history_key;
245 client.head_index = sync_snapshot.head_index;
246
247 fold_state_from_append_log(&cache_dir)
248}
249
250pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
251 fold_state_from_append_log(cache_dir).unwrap_or_default()
252}
253
254pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
255 read_cursor(&cache_dir.join("cursor.json")).head_index
256}
257
258pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
259 sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn fold_state_ignores_trailing_partial_line() {
268 let temp_dir = tempfile::tempdir().expect("tempdir");
269 let cache_dir = temp_dir.path();
270 let log_path = cache_dir.join("things.log");
271
272 let line_one = r#"{"3C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00A":{"t":0,"e":"Settings5","p":{}}}"#;
273 let line_two = r#"{"4C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00B":{"t":0,"e":"Settings5","p":{}}}"#;
274 let split_at = line_two.len() / 2;
275
276 fs::write(
277 &log_path,
278 format!("{}\n{}", line_one, &line_two[..split_at]),
279 )
280 .expect("seed log");
281
282 let first_state = fold_state_from_append_log(cache_dir).expect("first fold");
283 assert_eq!(first_state.len(), 1);
284
285 let (_, first_offset) = read_state_cache(cache_dir);
286 assert_eq!(first_offset, (line_one.len() + 1) as u64);
287
288 let mut fp = OpenOptions::new()
289 .append(true)
290 .open(&log_path)
291 .expect("open log for append");
292 writeln!(fp, "{}", &line_two[split_at..]).expect("append line remainder");
293
294 let second_state = fold_state_from_append_log(cache_dir).expect("second fold");
295 assert_eq!(second_state.len(), 2);
296
297 let expected_offset = fs::metadata(&log_path).expect("log metadata").len();
298 let (_, second_offset) = read_state_cache(cache_dir);
299 assert_eq!(second_offset, expected_offset);
300 }
301}