1use std::{
2 fs::{self, File, OpenOptions},
3 io::{BufRead, BufReader, Seek, SeekFrom, Write},
4 path::{Path, PathBuf},
5};
6
7use anyhow::{Context, Result, anyhow};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::{
12 client::ThingsCloudClient,
13 store::{RawState, fold_item},
14 wire::wire_object::WireItem,
15};
16
17#[derive(Debug, Clone, Default)]
18struct SyncSnapshot {
19 history_key: Option<String>,
20 head_index: i64,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, Default)]
24struct CursorData {
25 next_start_index: i64,
26 history_key: String,
27 #[serde(default)]
28 head_index: i64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default)]
32struct StateCacheData {
33 #[serde(default)]
34 version: u8,
35 log_offset: u64,
36 state: RawState,
37}
38
39const STATE_CACHE_VERSION: u8 = 2;
40
41fn read_cursor(path: &Path) -> CursorData {
42 if !path.exists() {
43 return CursorData::default();
44 }
45 let Ok(raw) = fs::read_to_string(path) else {
46 return CursorData::default();
47 };
48 serde_json::from_str(&raw).unwrap_or_default()
49}
50
51fn write_cursor(
52 path: &Path,
53 next_start_index: i64,
54 history_key: &str,
55 head_index: i64,
56) -> Result<()> {
57 let payload = serde_json::to_string(&serde_json::json!({
58 "next_start_index": next_start_index,
59 "history_key": history_key,
60 "head_index": head_index,
61 "updated_at": crate::client::now_timestamp(),
62 }))?;
63 let tmp = path.with_extension("tmp");
64 fs::write(&tmp, payload)?;
65 fs::rename(tmp, path)?;
66 Ok(())
67}
68
69pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
70 fs::create_dir_all(cache_dir)?;
71 let log_path = cache_dir.join("things.log");
72 let cursor_path = cache_dir.join("cursor.json");
73
74 let cursor = read_cursor(&cursor_path);
75 let mut start_index = cursor.next_start_index;
76
77 if client.history_key.is_none() {
78 if !cursor.history_key.is_empty() {
79 client.history_key = Some(cursor.history_key.clone());
80 } else {
81 let _ = client.authenticate()?;
82 }
83 }
84
85 let mut fp = OpenOptions::new()
86 .create(true)
87 .append(true)
88 .open(&log_path)
89 .with_context(|| format!("failed to open {}", log_path.display()))?;
90
91 loop {
92 let page = match client.get_items_page(start_index) {
93 Ok(v) => v,
94 Err(_) => {
95 let _ = client.authenticate()?;
96 client.get_items_page(start_index)?
97 }
98 };
99
100 let items = page
101 .get("items")
102 .and_then(Value::as_array)
103 .cloned()
104 .unwrap_or_default();
105 let end = page
106 .get("end-total-content-size")
107 .and_then(Value::as_i64)
108 .unwrap_or(0);
109 let latest = page
110 .get("latest-total-content-size")
111 .and_then(Value::as_i64)
112 .unwrap_or(0);
113 client.head_index = page
114 .get("current-item-index")
115 .and_then(Value::as_i64)
116 .unwrap_or(client.head_index);
117
118 for item in &items {
119 writeln!(fp, "{}", serde_json::to_string(item)?)?;
120 }
121
122 if !items.is_empty() {
123 fp.flush()?;
124 start_index += items.len() as i64;
125 write_cursor(
126 &cursor_path,
127 start_index,
128 client.history_key.as_deref().unwrap_or_default(),
129 client.head_index,
130 )?;
131 }
132
133 if items.is_empty() || end >= latest {
134 break;
135 }
136 }
137
138 let current_history_key = client.history_key.clone().unwrap_or_default();
139 if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
140 write_cursor(
141 &cursor_path,
142 start_index,
143 ¤t_history_key,
144 client.head_index,
145 )?;
146 }
147
148 Ok(())
149}
150
151fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
152 let path = cache_dir.join("state_cache.json");
153 if !path.exists() {
154 return (RawState::new(), 0);
155 }
156 let Ok(raw) = fs::read_to_string(&path) else {
157 return (RawState::new(), 0);
158 };
159 let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
160 return (RawState::new(), 0);
161 };
162
163 if cache.version != STATE_CACHE_VERSION {
164 return (RawState::new(), 0);
165 }
166
167 (cache.state, cache.log_offset)
168}
169
170fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
171 let path = cache_dir.join("state_cache.json");
172 let payload = serde_json::to_string(&StateCacheData {
173 version: STATE_CACHE_VERSION,
174 log_offset,
175 state: state.clone(),
176 })?;
177 let tmp = path.with_extension("tmp");
178 fs::write(&tmp, payload)?;
179 fs::rename(tmp, path)?;
180 Ok(())
181}
182
183pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
184 let log_path = cache_dir.join("things.log");
185 if !log_path.exists() {
186 return Ok(RawState::new());
187 }
188
189 let (mut state, byte_offset) = read_state_cache(cache_dir);
190 let mut new_lines = 0u64;
191
192 let mut file =
193 File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
194 file.seek(SeekFrom::Start(byte_offset))?;
195 let mut reader = BufReader::new(file);
196 let mut line = String::new();
197 let mut safe_offset = byte_offset;
198
199 loop {
200 line.clear();
201 let read = reader.read_line(&mut line)?;
202 if read == 0 {
203 break;
204 }
205
206 if !line.ends_with('\n') {
207 break;
208 }
209
210 let stripped = line.trim();
211 if stripped.is_empty() {
212 safe_offset = reader.stream_position()?;
213 continue;
214 }
215 let item: WireItem = serde_json::from_str(stripped)
216 .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
217 fold_item(item, &mut state);
218 new_lines += 1;
219 safe_offset = reader.stream_position()?;
220 }
221
222 if new_lines > 0 {
223 write_state_cache(cache_dir, &state, safe_offset)?;
224 }
225
226 Ok(state)
227}
228
229pub fn get_state_with_append_log(
230 client: &mut ThingsCloudClient,
231 cache_dir: PathBuf,
232) -> Result<RawState> {
233 let mut sync_client = client.clone();
234 let sync_cache_dir = cache_dir.clone();
235
236 let sync_worker = std::thread::spawn(move || -> Result<SyncSnapshot> {
237 sync_append_log(&mut sync_client, &sync_cache_dir)?;
238 Ok(SyncSnapshot {
239 history_key: sync_client.history_key,
240 head_index: sync_client.head_index,
241 })
242 });
243
244 let _stale_state = fold_state_from_append_log(&cache_dir)?;
245
246 let sync_snapshot = sync_worker
247 .join()
248 .map_err(|_| anyhow!("sync worker panicked"))??;
249
250 client.history_key = sync_snapshot.history_key;
251 client.head_index = sync_snapshot.head_index;
252
253 fold_state_from_append_log(&cache_dir)
254}
255
256pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
257 fold_state_from_append_log(cache_dir).unwrap_or_default()
258}
259
260pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
261 read_cursor(&cache_dir.join("cursor.json")).head_index
262}
263
264pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
265 sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn fold_state_ignores_trailing_partial_line() {
274 let temp_dir = tempfile::tempdir().expect("tempdir");
275 let cache_dir = temp_dir.path();
276 let log_path = cache_dir.join("things.log");
277
278 let line_one = r#"{"3C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00A":{"t":0,"e":"Settings5","p":{}}}"#;
279 let line_two = r#"{"4C6BBD49-8D11-4FFF-8B0E-B8F33FA9C00B":{"t":0,"e":"Settings5","p":{}}}"#;
280 let split_at = line_two.len() / 2;
281
282 fs::write(
283 &log_path,
284 format!("{}\n{}", line_one, &line_two[..split_at]),
285 )
286 .expect("seed log");
287
288 let first_state = fold_state_from_append_log(cache_dir).expect("first fold");
289 assert_eq!(first_state.len(), 1);
290
291 let (_, first_offset) = read_state_cache(cache_dir);
292 assert_eq!(first_offset, (line_one.len() + 1) as u64);
293
294 let mut fp = OpenOptions::new()
295 .append(true)
296 .open(&log_path)
297 .expect("open log for append");
298 writeln!(fp, "{}", &line_two[split_at..]).expect("append line remainder");
299
300 let second_state = fold_state_from_append_log(cache_dir).expect("second fold");
301 assert_eq!(second_state.len(), 2);
302
303 let expected_offset = fs::metadata(&log_path).expect("log metadata").len();
304 let (_, second_offset) = read_state_cache(cache_dir);
305 assert_eq!(second_offset, expected_offset);
306 }
307}