Skip to main content

things3_cloud/
log_cache.rs

1use crate::client::ThingsCloudClient;
2use crate::store::{fold_item, RawState};
3use crate::wire::wire_object::WireItem;
4use anyhow::{anyhow, Context, Result};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11#[derive(Debug, Clone, Serialize, Deserialize, Default)]
12struct CursorData {
13    next_start_index: i64,
14    history_key: String,
15    #[serde(default)]
16    head_index: i64,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
20struct StateCacheData {
21    #[serde(default)]
22    version: u8,
23    log_offset: u64,
24    state: RawState,
25}
26
27const STATE_CACHE_VERSION: u8 = 2;
28
29fn read_cursor(path: &Path) -> CursorData {
30    if !path.exists() {
31        return CursorData::default();
32    }
33    let Ok(raw) = fs::read_to_string(path) else {
34        return CursorData::default();
35    };
36    serde_json::from_str(&raw).unwrap_or_default()
37}
38
39fn write_cursor(
40    path: &Path,
41    next_start_index: i64,
42    history_key: &str,
43    head_index: i64,
44) -> Result<()> {
45    let payload = serde_json::to_string(&serde_json::json!({
46        "next_start_index": next_start_index,
47        "history_key": history_key,
48        "head_index": head_index,
49        "updated_at": crate::client::now_timestamp(),
50    }))?;
51    let tmp = path.with_extension("tmp");
52    fs::write(&tmp, payload)?;
53    fs::rename(tmp, path)?;
54    Ok(())
55}
56
57pub fn sync_append_log(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
58    fs::create_dir_all(cache_dir)?;
59    let log_path = cache_dir.join("things.log");
60    let cursor_path = cache_dir.join("cursor.json");
61
62    let cursor = read_cursor(&cursor_path);
63    let mut start_index = cursor.next_start_index;
64
65    if client.history_key.is_none() {
66        if !cursor.history_key.is_empty() {
67            client.history_key = Some(cursor.history_key.clone());
68        } else {
69            let _ = client.authenticate()?;
70        }
71    }
72
73    let mut fp = OpenOptions::new()
74        .create(true)
75        .append(true)
76        .open(&log_path)
77        .with_context(|| format!("failed to open {}", log_path.display()))?;
78
79    loop {
80        let page = match client.get_items_page(start_index) {
81            Ok(v) => v,
82            Err(_) => {
83                let _ = client.authenticate()?;
84                client.get_items_page(start_index)?
85            }
86        };
87
88        let items = page
89            .get("items")
90            .and_then(Value::as_array)
91            .cloned()
92            .unwrap_or_default();
93        let end = page
94            .get("end-total-content-size")
95            .and_then(Value::as_i64)
96            .unwrap_or(0);
97        let latest = page
98            .get("latest-total-content-size")
99            .and_then(Value::as_i64)
100            .unwrap_or(0);
101        client.head_index = page
102            .get("current-item-index")
103            .and_then(Value::as_i64)
104            .unwrap_or(client.head_index);
105
106        for item in &items {
107            writeln!(fp, "{}", serde_json::to_string(item)?)?;
108        }
109
110        if !items.is_empty() {
111            fp.flush()?;
112            start_index += items.len() as i64;
113            write_cursor(
114                &cursor_path,
115                start_index,
116                client.history_key.as_deref().unwrap_or_default(),
117                client.head_index,
118            )?;
119        }
120
121        if items.is_empty() || end >= latest {
122            break;
123        }
124    }
125
126    let current_history_key = client.history_key.clone().unwrap_or_default();
127    if current_history_key != cursor.history_key || client.head_index != cursor.head_index {
128        write_cursor(
129            &cursor_path,
130            start_index,
131            &current_history_key,
132            client.head_index,
133        )?;
134    }
135
136    Ok(())
137}
138
139fn read_state_cache(cache_dir: &Path) -> (RawState, u64) {
140    let path = cache_dir.join("state_cache.json");
141    if !path.exists() {
142        return (RawState::new(), 0);
143    }
144    let Ok(raw) = fs::read_to_string(&path) else {
145        return (RawState::new(), 0);
146    };
147    let Ok(cache) = serde_json::from_str::<StateCacheData>(&raw) else {
148        return (RawState::new(), 0);
149    };
150
151    if cache.version != STATE_CACHE_VERSION {
152        return (RawState::new(), 0);
153    }
154
155    (cache.state, cache.log_offset)
156}
157
158fn write_state_cache(cache_dir: &Path, state: &RawState, log_offset: u64) -> Result<()> {
159    let path = cache_dir.join("state_cache.json");
160    let payload = serde_json::to_string(&StateCacheData {
161        version: STATE_CACHE_VERSION,
162        log_offset,
163        state: state.clone(),
164    })?;
165    let tmp = path.with_extension("tmp");
166    fs::write(&tmp, payload)?;
167    fs::rename(tmp, path)?;
168    Ok(())
169}
170
171pub fn fold_state_from_append_log(cache_dir: &Path) -> Result<RawState> {
172    let log_path = cache_dir.join("things.log");
173    if !log_path.exists() {
174        return Ok(RawState::new());
175    }
176
177    let (mut state, byte_offset) = read_state_cache(cache_dir);
178    let mut new_lines = 0u64;
179
180    let mut file =
181        File::open(&log_path).with_context(|| format!("failed to open {}", log_path.display()))?;
182    file.seek(SeekFrom::Start(byte_offset))?;
183    let mut reader = BufReader::new(file);
184    let mut line = String::new();
185
186    loop {
187        line.clear();
188        let read = reader.read_line(&mut line)?;
189        if read == 0 {
190            break;
191        }
192        let stripped = line.trim();
193        if stripped.is_empty() {
194            continue;
195        }
196        let item: WireItem = serde_json::from_str(stripped)
197            .with_context(|| format!("Corrupt log entry at {}", log_path.display()))?;
198        fold_item(item, &mut state);
199        new_lines += 1;
200    }
201
202    let end_offset = reader.stream_position()?;
203    if new_lines > 0 {
204        write_state_cache(cache_dir, &state, end_offset)?;
205    }
206
207    Ok(state)
208}
209
210pub fn get_state_with_append_log(
211    client: &mut ThingsCloudClient,
212    cache_dir: PathBuf,
213) -> Result<RawState> {
214    sync_append_log(client, &cache_dir)?;
215    fold_state_from_append_log(&cache_dir)
216}
217
218pub fn fold_state_from_append_log_or_empty(cache_dir: &Path) -> RawState {
219    fold_state_from_append_log(cache_dir).unwrap_or_default()
220}
221
222pub fn read_cached_head_index(cache_dir: &Path) -> i64 {
223    read_cursor(&cache_dir.join("cursor.json")).head_index
224}
225
226pub fn sync_append_log_or_err(client: &mut ThingsCloudClient, cache_dir: &Path) -> Result<()> {
227    sync_append_log(client, cache_dir).map_err(|e| anyhow!(e.to_string()))
228}