Skip to main content

braid_core/fs/
subscription.rs

1use super::PEER_ID;
2use crate::core::BraidRequest;
3use crate::core::Result;
4use crate::fs::mapping;
5use crate::fs::state::DaemonState;
6use std::collections::HashMap;
7
8pub async fn spawn_subscription(
9    url: String,
10    subscriptions: &mut HashMap<String, tokio::task::JoinHandle<()>>,
11    state: DaemonState,
12) {
13    if subscriptions.contains_key(&url) {
14        return;
15    }
16
17    if !state
18        .config
19        .read()
20        .await
21        .sync
22        .get(&url)
23        .cloned()
24        .unwrap_or(false)
25    {
26        return;
27    }
28
29    let url_capture = url.clone();
30    let state_capture = state.clone();
31    let handle = tokio::spawn(async move {
32        loop {
33            match subscribe_loop(url_capture.clone(), state_capture.clone()).await {
34                Ok(_) => {
35                    tracing::info!(
36                        "Subscription for {} ended normally (disconnect). Retrying in 5s...",
37                        url_capture
38                    );
39                }
40                Err(e) => {
41                    tracing::error!(
42                        "Subscription error for {}: {}. Retrying in 5s...",
43                        url_capture,
44                        e
45                    );
46                }
47            }
48            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
49        }
50    });
51
52    subscriptions.insert(url, handle);
53}
54
55pub async fn subscribe_loop(url: String, state: DaemonState) -> Result<()> {
56    tracing::info!("Subscribing to {}", url);
57
58    let mut req = BraidRequest::new()
59        .subscribe()
60        .with_header("Accept", "text/plain")
61        .with_header("Heartbeats", "30s");
62
63    // For braid.org wiki pages, request the dt (diamond types) merge type
64    // This ensures we get the actual wiki text content, not the HTML shell
65    if url.contains("braid.org") {
66        req = req.with_merge_type("dt");
67    }
68
69    // Add Authentication Headers
70    if let Ok(u) = url::Url::parse(&url) {
71        if let Some(domain) = u.domain() {
72            let cfg = state.config.read().await;
73            if let Some(token) = cfg.cookies.get(domain) {
74                req = req.with_header("Authorization", format!("Bearer {}", token));
75                let cookie_str = if token.contains('=') {
76                    token.clone()
77                } else {
78                    format!("token={}", token)
79                };
80                req = req.with_header("Cookie", cookie_str);
81            }
82        }
83    }
84
85    let mut sub = state.client.subscribe(&url, req).await?;
86    let mut is_first = true;
87
88    while let Some(update) = sub.next().await {
89        let update = update?;
90
91        // Handle 309 Reborn during subscription
92        if update.status == 309 {
93            tracing::warn!(
94                "[BraidFS] Reborn (309) detected during subscription for {}. History reset.",
95                url
96            );
97            is_first = true;
98        }
99
100        // Filter echoes (ignore updates from ourselves)
101        if let Some(v) = update.primary_version() {
102            let v_str = v.to_string();
103            let my_id = PEER_ID.read().await;
104            if !is_first && v_str.contains(&*my_id) {
105                tracing::debug!(
106                    "[BraidFS] Ignoring echo from {} (version matches our PEER_ID {})",
107                    url,
108                    *my_id
109                );
110                continue;
111            }
112        }
113        is_first = false;
114
115        tracing::debug!("Received update from {}: {:?}", url, update.version);
116
117        // Update version store
118        {
119            state.tracker.mark(&url);
120            let mut store = state.version_store.write().await;
121            store.update(&url, update.version.clone(), update.parents.clone());
122            let _ = store.save().await;
123        }
124
125        let patches = match update.patches.as_ref() {
126            Some(p) if !p.is_empty() => p,
127            _ => {
128                // Check if it is a snapshot
129                if let Some(body) = update.body_str() {
130                    tracing::info!(
131                        "[BraidFS-Sub] Snapshot for {}, writing {} bytes",
132                        url,
133                        body.len()
134                    );
135
136                    // Get/Create Merge State (extracting valid content version)
137                    let raw_content = {
138                        let mut merges = state.active_merges.write().await;
139                        let peer_id = PEER_ID.read().await.clone();
140                        let requested_merge_type =
141                            update.merge_type.as_deref().unwrap_or("diamond");
142                        let merge = merges.entry(url.clone()).or_insert_with(|| {
143                            tracing::info!(
144                                "[BraidFS] Creating merge state for {} with type: {}",
145                                url,
146                                requested_merge_type
147                            );
148                            let mut m = state
149                                .merge_registry
150                                .create(requested_merge_type, &peer_id)
151                                .or_else(|| state.merge_registry.create("diamond", &peer_id))
152                                .expect("Failed to create merge type");
153                            m.initialize(body);
154                            m
155                        });
156
157                        let patch = crate::core::merge::MergePatch {
158                            range: "".to_string(),
159                            content: serde_json::Value::String(body.to_string()),
160                            version: update.primary_version().map(|v| v.to_string()),
161                            parents: update.parents.iter().map(|p| p.to_string()).collect(),
162                        };
163                        merge.apply_patch(patch);
164                        merge.get_content()
165                    };
166
167                    // Filter: Auto-Extract Markdown if HTML shell
168                    let final_content = if raw_content.trim().starts_with("<!DOCTYPE")
169                        || raw_content.trim().starts_with("<html")
170                    {
171                        mapping::extract_markdown(&raw_content)
172                    } else {
173                        raw_content
174                    };
175
176                    // Update Content Cache
177                    {
178                        let mut cache = state.content_cache.write().await;
179                        cache.insert(url.clone(), final_content.clone());
180                    }
181
182                    if let Ok(path) = mapping::url_to_path(&url) {
183                        // Add to pending BEFORE writing to avoid echo loop
184                        state.pending.add(path.clone());
185
186                        if let Some(parent) = path.parent() {
187                            ensure_dir_path(parent).await;
188                        }
189
190                        // Atomic Write: Write to .tmp file then rename (Snapshot)
191                        let tmp_path = path.with_extension("tmp");
192                        if let Err(e) = tokio::fs::write(&tmp_path, final_content.clone()).await {
193                            tracing::error!("Failed to write tmp file for snapshot {}: {}", url, e);
194                        } else {
195                            match tokio::fs::rename(&tmp_path, &path).await {
196                                Ok(_) => {
197                                    // Update Content Cache only on success
198                                    let mut cache = state.content_cache.write().await;
199                                    cache.insert(url.clone(), final_content.clone());
200                                }
201                                Err(e) => {
202                                    tracing::error!("Failed to rename tmp file for snapshot {}: {} (fallback direct)", url, e);
203                                    if let Err(e2) = tokio::fs::write(&path, final_content).await {
204                                        tracing::error!(
205                                            "Direct snapshot write failed for {}: {}",
206                                            url,
207                                            e2
208                                        );
209                                    }
210                                }
211                            }
212                        }
213                    }
214                }
215                continue;
216            }
217        };
218
219        let path = mapping::url_to_path(&url)?;
220
221        let content = if path.exists() {
222            tokio::fs::read_to_string(&path).await.unwrap_or_default()
223        } else {
224            String::new()
225        };
226
227        // Apply Patches via Merge State
228        let final_content = {
229            let mut merges = state.active_merges.write().await;
230            let peer_id = PEER_ID.read().await.clone();
231            let requested_merge_type = update.merge_type.as_deref().unwrap_or("diamond");
232            let merge = merges.entry(url.clone()).or_insert_with(|| {
233                tracing::info!(
234                    "[BraidFS] Creating merge state for {} with type: {}",
235                    url,
236                    requested_merge_type
237                );
238                let mut m = state
239                    .merge_registry
240                    .create(requested_merge_type, &peer_id)
241                    .or_else(|| state.merge_registry.create("diamond", &peer_id))
242                    .expect("Failed to create merge type");
243                m.initialize(&content);
244                m
245            });
246
247            for patch in patches {
248                let patch_content = std::str::from_utf8(&patch.content).unwrap_or("");
249
250                let merge_patch = crate::core::merge::MergePatch {
251                    range: patch.range.clone(),
252                    content: serde_json::Value::String(patch_content.to_string()),
253                    version: update.primary_version().map(|v| v.to_string()),
254                    parents: update.parents.iter().map(|p| p.to_string()).collect(),
255                };
256                merge.apply_patch(merge_patch);
257            }
258            merge.get_content()
259        };
260
261        if let Ok(path) = mapping::url_to_path(&url) {
262            // Add to pending BEFORE writing to avoid echo loop
263            state.pending.add(path.clone());
264
265            if let Some(parent) = path.parent() {
266                ensure_dir_path(parent).await;
267            }
268
269            // Atomic Write: Write to .tmp file then rename
270            // This prevents "Access Denied" if the file is open in an editor/viewer
271            let tmp_path = path.with_extension("tmp");
272            if let Err(e) = tokio::fs::write(&tmp_path, final_content.clone()).await {
273                tracing::error!("Failed to write tmp file for {}: {}", url, e);
274            } else {
275                match tokio::fs::rename(&tmp_path, &path).await {
276                    Ok(_) => {
277                        // Update Content Cache only on success
278                        let mut cache = state.content_cache.write().await;
279                        cache.insert(url.clone(), final_content);
280                    }
281                    Err(e) => {
282                        tracing::error!("Failed to rename tmp file for {}: {} (attempting direct write fallback)", url, e);
283                        // Fallback: Try direct write if rename fails (e.g. cross-device, though unlikely in simple sync)
284                        if let Err(e2) = tokio::fs::write(&path, final_content.clone()).await {
285                            tracing::error!("Direct write fallback failed for {}: {}", url, e2);
286                        }
287                    }
288                }
289            }
290        }
291
292        tracing::debug!("Updated local file {}", url);
293    }
294
295    Ok(())
296}
297
298async fn ensure_dir_path(path: &std::path::Path) {
299    let mut current = std::path::PathBuf::new();
300    for component in path.components() {
301        current.push(component);
302        if current.exists() {
303            if current.is_file() {
304                let mut new_name = current.clone();
305                new_name.set_extension("txt");
306                tracing::warn!(
307                    "[BraidFS] Path conflict: {:?} is a file but needs to be a directory. Renaming to {:?}",
308                    current,
309                    new_name
310                );
311                if let Err(e) = tokio::fs::rename(&current, &new_name).await {
312                    tracing::error!("[BraidFS] Failed to resolve path conflict: {}", e);
313                    continue;
314                }
315                // Now create the directory
316                if let Err(e) = tokio::fs::create_dir(&current).await {
317                    tracing::error!("[BraidFS] Failed to create directory after rename: {}", e);
318                }
319            }
320        } else {
321            if let Err(e) = tokio::fs::create_dir(&current).await {
322                // It might have been created by another task in the meantime
323                if e.kind() != std::io::ErrorKind::AlreadyExists {
324                    tracing::error!("[BraidFS] Failed to create directory {:?}: {}", current, e);
325                }
326            }
327        }
328    }
329}