braid_core/fs/
subscription.rs1use super::PEER_ID;
2use crate::core::BraidRequest;
3use crate::core::Result;
4use crate::fs::mapping;
5use crate::fs::state::DaemonState;
6use std::collections::HashMap;
7
8pub async fn spawn_subscription(
9 url: String,
10 subscriptions: &mut HashMap<String, tokio::task::JoinHandle<()>>,
11 state: DaemonState,
12) {
13 if subscriptions.contains_key(&url) {
14 return;
15 }
16
17 if !state
18 .config
19 .read()
20 .await
21 .sync
22 .get(&url)
23 .cloned()
24 .unwrap_or(false)
25 {
26 return;
27 }
28
29 let url_capture = url.clone();
30 let state_capture = state.clone();
31 let handle = tokio::spawn(async move {
32 loop {
33 match subscribe_loop(url_capture.clone(), state_capture.clone()).await {
34 Ok(_) => {
35 tracing::info!(
36 "Subscription for {} ended normally (disconnect). Retrying in 5s...",
37 url_capture
38 );
39 }
40 Err(e) => {
41 tracing::error!(
42 "Subscription error for {}: {}. Retrying in 5s...",
43 url_capture,
44 e
45 );
46 }
47 }
48 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
49 }
50 });
51
52 subscriptions.insert(url, handle);
53}
54
55pub async fn subscribe_loop(url: String, state: DaemonState) -> Result<()> {
56 tracing::info!("Subscribing to {}", url);
57
58 let mut req = BraidRequest::new()
59 .subscribe()
60 .with_header("Accept", "text/plain")
61 .with_header("Heartbeats", "30s");
62
63 if url.contains("braid.org") {
66 req = req.with_merge_type("dt");
67 }
68
69 if let Ok(u) = url::Url::parse(&url) {
71 if let Some(domain) = u.domain() {
72 let cfg = state.config.read().await;
73 if let Some(token) = cfg.cookies.get(domain) {
74 req = req.with_header("Authorization", format!("Bearer {}", token));
75 let cookie_str = if token.contains('=') {
76 token.clone()
77 } else {
78 format!("token={}", token)
79 };
80 req = req.with_header("Cookie", cookie_str);
81 }
82 }
83 }
84
85 let mut sub = state.client.subscribe(&url, req).await?;
86 let mut is_first = true;
87
88 while let Some(update) = sub.next().await {
89 let update = update?;
90
91 if update.status == 309 {
93 tracing::warn!(
94 "[BraidFS] Reborn (309) detected during subscription for {}. History reset.",
95 url
96 );
97 is_first = true;
98 }
99
100 if let Some(v) = update.primary_version() {
102 let v_str = v.to_string();
103 let my_id = PEER_ID.read().await;
104 if !is_first && v_str.contains(&*my_id) {
105 tracing::debug!(
106 "[BraidFS] Ignoring echo from {} (version matches our PEER_ID {})",
107 url,
108 *my_id
109 );
110 continue;
111 }
112 }
113 is_first = false;
114
115 tracing::debug!("Received update from {}: {:?}", url, update.version);
116
117 {
119 state.tracker.mark(&url);
120 let mut store = state.version_store.write().await;
121 store.update(&url, update.version.clone(), update.parents.clone());
122 let _ = store.save().await;
123 }
124
125 let patches = match update.patches.as_ref() {
126 Some(p) if !p.is_empty() => p,
127 _ => {
128 if let Some(body) = update.body_str() {
130 tracing::info!(
131 "[BraidFS-Sub] Snapshot for {}, writing {} bytes",
132 url,
133 body.len()
134 );
135
136 let raw_content = {
138 let mut merges = state.active_merges.write().await;
139 let peer_id = PEER_ID.read().await.clone();
140 let requested_merge_type =
141 update.merge_type.as_deref().unwrap_or("diamond");
142 let merge = merges.entry(url.clone()).or_insert_with(|| {
143 tracing::info!(
144 "[BraidFS] Creating merge state for {} with type: {}",
145 url,
146 requested_merge_type
147 );
148 let mut m = state
149 .merge_registry
150 .create(requested_merge_type, &peer_id)
151 .or_else(|| state.merge_registry.create("diamond", &peer_id))
152 .expect("Failed to create merge type");
153 m.initialize(body);
154 m
155 });
156
157 let patch = crate::core::merge::MergePatch {
158 range: "".to_string(),
159 content: serde_json::Value::String(body.to_string()),
160 version: update.primary_version().map(|v| v.to_string()),
161 parents: update.parents.iter().map(|p| p.to_string()).collect(),
162 };
163 merge.apply_patch(patch);
164 merge.get_content()
165 };
166
167 let final_content = if raw_content.trim().starts_with("<!DOCTYPE")
169 || raw_content.trim().starts_with("<html")
170 {
171 mapping::extract_markdown(&raw_content)
172 } else {
173 raw_content
174 };
175
176 {
178 let mut cache = state.content_cache.write().await;
179 cache.insert(url.clone(), final_content.clone());
180 }
181
182 if let Ok(path) = mapping::url_to_path(&url) {
183 state.pending.add(path.clone());
185
186 if let Some(parent) = path.parent() {
187 ensure_dir_path(parent).await;
188 }
189
190 let tmp_path = path.with_extension("tmp");
192 if let Err(e) = tokio::fs::write(&tmp_path, final_content.clone()).await {
193 tracing::error!("Failed to write tmp file for snapshot {}: {}", url, e);
194 } else {
195 match tokio::fs::rename(&tmp_path, &path).await {
196 Ok(_) => {
197 let mut cache = state.content_cache.write().await;
199 cache.insert(url.clone(), final_content.clone());
200 }
201 Err(e) => {
202 tracing::error!("Failed to rename tmp file for snapshot {}: {} (fallback direct)", url, e);
203 if let Err(e2) = tokio::fs::write(&path, final_content).await {
204 tracing::error!(
205 "Direct snapshot write failed for {}: {}",
206 url,
207 e2
208 );
209 }
210 }
211 }
212 }
213 }
214 }
215 continue;
216 }
217 };
218
219 let path = mapping::url_to_path(&url)?;
220
221 let content = if path.exists() {
222 tokio::fs::read_to_string(&path).await.unwrap_or_default()
223 } else {
224 String::new()
225 };
226
227 let final_content = {
229 let mut merges = state.active_merges.write().await;
230 let peer_id = PEER_ID.read().await.clone();
231 let requested_merge_type = update.merge_type.as_deref().unwrap_or("diamond");
232 let merge = merges.entry(url.clone()).or_insert_with(|| {
233 tracing::info!(
234 "[BraidFS] Creating merge state for {} with type: {}",
235 url,
236 requested_merge_type
237 );
238 let mut m = state
239 .merge_registry
240 .create(requested_merge_type, &peer_id)
241 .or_else(|| state.merge_registry.create("diamond", &peer_id))
242 .expect("Failed to create merge type");
243 m.initialize(&content);
244 m
245 });
246
247 for patch in patches {
248 let patch_content = std::str::from_utf8(&patch.content).unwrap_or("");
249
250 let merge_patch = crate::core::merge::MergePatch {
251 range: patch.range.clone(),
252 content: serde_json::Value::String(patch_content.to_string()),
253 version: update.primary_version().map(|v| v.to_string()),
254 parents: update.parents.iter().map(|p| p.to_string()).collect(),
255 };
256 merge.apply_patch(merge_patch);
257 }
258 merge.get_content()
259 };
260
261 if let Ok(path) = mapping::url_to_path(&url) {
262 state.pending.add(path.clone());
264
265 if let Some(parent) = path.parent() {
266 ensure_dir_path(parent).await;
267 }
268
269 let tmp_path = path.with_extension("tmp");
272 if let Err(e) = tokio::fs::write(&tmp_path, final_content.clone()).await {
273 tracing::error!("Failed to write tmp file for {}: {}", url, e);
274 } else {
275 match tokio::fs::rename(&tmp_path, &path).await {
276 Ok(_) => {
277 let mut cache = state.content_cache.write().await;
279 cache.insert(url.clone(), final_content);
280 }
281 Err(e) => {
282 tracing::error!("Failed to rename tmp file for {}: {} (attempting direct write fallback)", url, e);
283 if let Err(e2) = tokio::fs::write(&path, final_content.clone()).await {
285 tracing::error!("Direct write fallback failed for {}: {}", url, e2);
286 }
287 }
288 }
289 }
290 }
291
292 tracing::debug!("Updated local file {}", url);
293 }
294
295 Ok(())
296}
297
298async fn ensure_dir_path(path: &std::path::Path) {
299 let mut current = std::path::PathBuf::new();
300 for component in path.components() {
301 current.push(component);
302 if current.exists() {
303 if current.is_file() {
304 let mut new_name = current.clone();
305 new_name.set_extension("txt");
306 tracing::warn!(
307 "[BraidFS] Path conflict: {:?} is a file but needs to be a directory. Renaming to {:?}",
308 current,
309 new_name
310 );
311 if let Err(e) = tokio::fs::rename(¤t, &new_name).await {
312 tracing::error!("[BraidFS] Failed to resolve path conflict: {}", e);
313 continue;
314 }
315 if let Err(e) = tokio::fs::create_dir(¤t).await {
317 tracing::error!("[BraidFS] Failed to create directory after rename: {}", e);
318 }
319 }
320 } else {
321 if let Err(e) = tokio::fs::create_dir(¤t).await {
322 if e.kind() != std::io::ErrorKind::AlreadyExists {
324 tracing::error!("[BraidFS] Failed to create directory {:?}: {}", current, e);
325 }
326 }
327 }
328 }
329}