things3_cloud/
log_cache.rs1use crate::client::ThingsCloudClient;
2use crate::store::{fold_item, RawState};
3use crate::wire::wire_object::WireItem;
4use anyhow::{anyhow, Context, Result};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11#[derive(Debug, Clone, Serialize, Deserialize, Default)]
12struct CursorData {
13 next_start_index: i64,
14 history_key: String,
15 #[serde(default)]
16 head_index: i64,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
20struct StateCacheData {
21 #[serde(default)]
22 version: u8,
23 log_offset: u64,
24 state: RawState,
25}
26
27const STATE_CACHE_VERSION: u8 = 2;
28
29fn read_cursor(path: &Path) -> CursorData {
30 if !path.exists() {
31 return CursorData::default();
32 }
33 let Ok(raw) = fs::read_to_string(path) else {
34 return CursorData::default();
35 };
36 serde_json::from_str(&raw).unwrap_or_default()
37}
38
39fn write_cursor(
40 path: &Path,
41 next_start_index: i64,
42 history_key: &str,
43 head_index: i64,
44) -> Result<()> {
45 let payload = serde_json::to_string(&serde_json::json!({
46 "next_start_index": next_start_index,
47 "history_key": history_key,
48 "head_index": head_index,
49 "updated_at": crate::client::now_timestamp(),
50 }))?;
51 let tmp = path.with_extension("tmp");
52 fs::write(&tmp, payload)?;
53 fs::rename(tmp, path)?;
54 Ok(())
55}
56
57pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
58 fs::create_dir_all(cache_dir)?;
59 let log_path = cache_dir.join("things.log");
60 let cursor_path = cache_dir.join("cursor.json");
61
62 let cursor = read_cursor(&cursor_path);
63 let mut start_index = cursor.next_start_index;
64
65 if client.history_key.is_none() {
66 if !cursor.history_key.is_empty() {
67 client.history_key = Some(cursor.history_key.clone());
68 } else {
69 let _ = client.authenticate()?;
70 }
71 }
72
73 let mut fp = OpenOptions::new()
74 .create(true)
75 .append(true)
76 .open(&log_path)
77 .with_context(|| format!("failed to open {}", log_path.display()))?;
78
79 loop {
80 let page = match client.get_items_page(start_index) {
81 Ok(v) => v,
82 Err(_) => {
83 let _ = client.authenticate()?;
84 client.get_items_page(start_index)?
85 }
86 };
87
88 let items = page
89 .get("items")
90 .and_then(Value::as_array)
91 .cloned()
92 .unwrap_or_default();
93 let end = page
94 .get("end-total-content-size")
95 .and_then(Value::as_i64)
96 .unwrap_or(0);
97 let latest = page
98 .get("latest-total-content-size")
99 .and_then(Value::as_i64)
100 .unwrap_or(0);
101 client.head_index = page
102 .get("current-item-index")
103 .and_then(Value::as_i64)
104 .unwrap_or(client.head_index);
105
106 for item in &items {
107 writeln!(fp, "{}", serde_json::to_string(item)?)?;
108 }
109
110 if !items.is_empty() {
111 fp.flush()?;
112 start_index += items.len() as i64;
113 write_cursor(
114 &cursor_path,
115 start_index,
116 client.history_key.as_deref().unwrap_or_default(),
117 client.head_index,
118 )?;
119 }
120
121 if items.is_empty() || end >= latest {
122 break;
123 }
124 }
125
126 let current_history_key = client.history_key.clone().unwrap_or_default();
127 if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
128 write_cursor(
129 &cursor_path,
130 start_index,
131 ¤t_history_key,
132 client.head_index,
133 )?;
134 }
135
136 Ok(())
137}
138
139fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
140 let path = cache_dir.join("state_cache.json");
141 if !path.exists() {
142 return (RawState::new(), 0);
143 }
144 let Ok(raw) = fs::read_to_string(&path) else {
145 return (RawState::new(), 0);
146 };
147 let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
148 return (RawState::new(), 0);
149 };
150
151 if cache.version != STATE_CACHE_VERSION {
152 return (RawState::new(), 0);
153 }
154
155 (cache.state, cache.log_offset)
156}
157
158fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
159 let path = cache_dir.join("state_cache.json");
160 let payload = serde_json::to_string(&StateCacheData {
161 version: STATE_CACHE_VERSION,
162 log_offset,
163 state: state.clone(),
164 })?;
165 let tmp = path.with_extension("tmp");
166 fs::write(&tmp, payload)?;
167 fs::rename(tmp, path)?;
168 Ok(())
169}
170
171pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
172 let log_path = cache_dir.join("things.log");
173 if !log_path.exists() {
174 return Ok(RawState::new());
175 }
176
177 let (mut state, byte_offset) = read_state_cache(cache_dir);
178 let mut new_lines = 0u64;
179
180 let mut file =
181 File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
182 file.seek(SeekFrom::Start(byte_offset))?;
183 let mut reader = BufReader::new(file);
184 let mut line = String::new();
185
186 loop {
187 line.clear();
188 let read = reader.read_line(&mut line)?;
189 if read == 0 {
190 break;
191 }
192 let stripped = line.trim();
193 if stripped.is_empty() {
194 continue;
195 }
196 let item: WireItem = serde_json::from_str(stripped)
197 .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
198 fold_item(item, &mut state);
199 new_lines += 1;
200 }
201
202 let end_offset = reader.stream_position()?;
203 if new_lines > 0 {
204 write_state_cache(cache_dir, &state, end_offset)?;
205 }
206
207 Ok(state)
208}
209
210pub fn get_state_with_append_log(
211 client: &mut ThingsCloudClient,
212 cache_dir: PathBuf,
213) -> Result<RawState> {
214 sync_append_log(client, &cache_dir)?;
215 fold_state_from_append_log(&cache_dir)
216}
217
218pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
219 fold_state_from_append_log(cache_dir).unwrap_or_default()
220}
221
222pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
223 read_cursor(&cache_dir.join("cursor.json")).head_index
224}
225
226pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
227 sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
228}