Skip to main content

wire/
config.rs

1//! On-disk state for `wire`.
2//!
3//! Layout:
4//!   `$XDG_CONFIG_HOME/wire/` (defaults to `~/.config/wire/`)
5//!     - `private.key`     — 32-byte raw Ed25519 seed (mode 0600)
6//!     - `agent-card.json` — signed self-card (mode 0644, public)
7//!     - `trust.json`      — pinned peers + tiers
8//!     - `config.toml`     — relay URL, body cap, etc. (created lazily)
9//!
10//!   `$XDG_STATE_HOME/wire/` (defaults to `~/.local/state/wire/`)
11//!     - `inbox/<peer>.jsonl`  — verified inbound events
12//!     - `outbox/<peer>.jsonl` — agent-appended outbound events (daemon flushes)
13//!     - `spool/`              — daemon-internal staging
14//!
15//! All paths are configurable via `WIRE_HOME` env var (overrides both dirs to
16//! `$WIRE_HOME/{config,state}/`). Used by the test harness to keep tests
17//! isolated from the operator's real config.
18
19use anyhow::{Context, Result, anyhow};
20use serde_json::Value;
21use std::collections::HashMap;
22use std::fs;
23use std::io::Write;
24use std::path::{Path, PathBuf};
25use std::sync::{Arc, Mutex, OnceLock};
26
27/// Root configuration directory. Honors `WIRE_HOME` for testing.
28///
29/// With `WIRE_HOME=/tmp/foo`, returns `/tmp/foo/config/wire`.
30/// Without it, returns the XDG default (e.g. `~/.config/wire/`).
31pub fn config_dir() -> Result<PathBuf> {
32    if let Ok(home) = std::env::var("WIRE_HOME") {
33        return Ok(PathBuf::from(home).join("config").join("wire"));
34    }
35    dirs::config_dir()
36        .map(|d| d.join("wire"))
37        .ok_or_else(|| anyhow!("could not resolve XDG_CONFIG_HOME — set WIRE_HOME"))
38}
39
40/// Root state directory (rotating data — inbox/outbox/spool).
41///
42/// With `WIRE_HOME=/tmp/foo`, returns `/tmp/foo/state/wire`.
43pub fn state_dir() -> Result<PathBuf> {
44    if let Ok(home) = std::env::var("WIRE_HOME") {
45        return Ok(PathBuf::from(home).join("state").join("wire"));
46    }
47    dirs::state_dir()
48        .or_else(dirs::data_local_dir)
49        .map(|d| d.join("wire"))
50        .ok_or_else(|| anyhow!("could not resolve XDG_STATE_HOME — set WIRE_HOME"))
51}
52
53pub fn private_key_path() -> Result<PathBuf> {
54    Ok(config_dir()?.join("private.key"))
55}
56pub fn agent_card_path() -> Result<PathBuf> {
57    Ok(config_dir()?.join("agent-card.json"))
58}
59pub fn trust_path() -> Result<PathBuf> {
60    Ok(config_dir()?.join("trust.json"))
61}
62pub fn config_toml_path() -> Result<PathBuf> {
63    Ok(config_dir()?.join("config.toml"))
64}
65pub fn inbox_dir() -> Result<PathBuf> {
66    Ok(state_dir()?.join("inbox"))
67}
68pub fn outbox_dir() -> Result<PathBuf> {
69    Ok(state_dir()?.join("outbox"))
70}
71
72/// Per-outbox-path mutex registry. Serializes intra-process appends so that
73/// concurrent `wire_send` calls (e.g. multiple agents driving the same MCP
74/// server) cannot interleave bytes mid-line. POSIX `O_APPEND` is atomic only
75/// for writes ≤ PIPE_BUF (typically 4096 bytes); wire events can exceed that
76/// (per-event cap is 256 KiB).
77///
78/// **Inter-process scope (CLI vs MCP-server vs daemon):** v0.1 does not take
79/// an OS-level flock — the daemon only reads the outbox + a cursor file, and
80/// concurrent CLI `wire send` invocations against a running MCP server are
81/// rare enough we accept the risk for now. v0.2 BACKLOG: switch to
82/// `fs2::FileExt::lock_exclusive` for cross-process safety.
83static OUTBOX_LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
84
85fn outbox_lock(path: &Path) -> Arc<Mutex<()>> {
86    let registry = OUTBOX_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
87    let mut g = registry.lock().expect("OUTBOX_LOCKS poisoned");
88    g.entry(path.to_path_buf())
89        .or_insert_with(|| Arc::new(Mutex::new(())))
90        .clone()
91}
92
93/// Append a single JSONL record to the outbox for `peer`, holding the
94/// per-path mutex to keep concurrent appenders from interleaving lines.
95///
96/// `record_bytes` should be the full canonical JSON of the signed event,
97/// without trailing newline (the helper appends it). All bytes are written
98/// in one `write_all` while the lock is held.
99///
100/// The `peer` arg is normalized to its bare handle (`bob@relay.example` →
101/// `bob`) so the outbox filename is always `<bare_handle>.jsonl`. This is
102/// the canonical form the push enumerator and daemon reader expect; the
103/// normalization at this chokepoint guarantees correctness for every
104/// future caller, even if they forget to `bare_handle()` first. The
105/// original silent-fail of v0.5.11 was a caller that passed the FQDN
106/// form (issue #2 — 25-minute message-loss incident, surface fix in
107/// v0.5.13). This defense-in-depth makes the on-disk contract self-
108/// enforcing instead of caller-policed.
109/// v0.14.2 (#162 fix #2): append a "pushed" record to the per-peer
110/// lifecycle log when `run_sync_push` confirms a relay POST landed
111/// (either as `ok` or as the idempotent `duplicate` — the relay has the
112/// event either way). The log sits next to the outbox JSONL at
113/// `<outbox_dir>/<peer>.pushed.jsonl` and carries one
114/// `{"ts":"...","event_id":"..."}` line per push.
115///
116/// Readers (`tool_status`, `wire_status` CLI, `wire_tail` lifecycle
117/// surface) join outbox events to this log by `event_id` to expose the
118/// `queued → pushed` lifecycle that fix #2 surfaces.
119///
120/// NOT pruned in v0.14.2. The log grows monotonically; for high-volume
121/// operators a v0.15+ pruner (entries older than `<config_dir>/lifecycle_retention_days`)
122/// is tracked at the issue. Best-effort: errors log but don't abort
123/// the daemon push loop — a wedged disk shouldn't kill sync.
124pub fn append_pushed_log(peer: &str, event_id: &str, ts: &str) -> Result<PathBuf> {
125    ensure_dirs()?;
126    let normalized = crate::agent_card::bare_handle(peer);
127    let path = outbox_dir()?.join(format!("{normalized}.pushed.jsonl"));
128    let lock = outbox_lock(&path);
129    let _g = lock.lock().expect("pushed-log per-path mutex poisoned");
130    let mut f = fs::OpenOptions::new()
131        .create(true)
132        .append(true)
133        .open(&path)
134        .with_context(|| format!("opening pushed-log {path:?}"))?;
135    let line = serde_json::to_string(&serde_json::json!({
136        "ts": ts,
137        "event_id": event_id,
138    }))?;
139    f.write_all(line.as_bytes())
140        .with_context(|| format!("appending to {path:?}"))?;
141    f.write_all(b"\n")?;
142    Ok(path)
143}
144
145/// Total queued-but-not-yet-pushed events across all peers. Walks
146/// each per-peer outbox file, counts event_ids missing from the
147/// per-peer pushed log. Cheap (one disk read per peer) and bounded by
148/// `trust.agents`.
149///
150/// v0.14.2 (#162 fix #2): the diagnostic for the "silent send" class —
151/// `pending_push_count > 0` + `stale_sync` = events queued, daemon not
152/// pushing. Was originally inline in `tool_status`; extracted so the
153/// CLI `wire status` surface and any future doctor/web check stay in
154/// agreement by construction.
155pub fn compute_pending_push_count() -> u64 {
156    compute_pending_push_breakdown()
157        .iter()
158        .map(|p| p.count)
159        .sum()
160}
161
162/// Per-peer breakdown of queued-but-not-pushed events. Populates
163/// the new `daemon.pending_push_breakdown` field in `wire status`
164/// and the human-readable expansion of the "pending push:" line.
165///
166/// Each entry carries the peer handle, the trust tier (so the
167/// surface can say "stuck on orchid-savanna (PENDING_ACK — pair
168/// never completed)"), and the unpushed event count.
169///
170/// **Why tier?** A peer at `PENDING_ACK` has events queued that
171/// won't push until pair-accept completes (a #166-class wedge).
172/// A peer at `VERIFIED` with events queued + `stale_sync` is the
173/// #162 silent-send class. Operators need the tier to know which
174/// path to fix.
175#[derive(Debug, Clone, serde::Serialize)]
176pub struct PendingPushPerPeer {
177    pub peer: String,
178    pub tier: String,
179    pub count: u64,
180}
181
182pub fn compute_pending_push_breakdown() -> Vec<PendingPushPerPeer> {
183    let trust = match read_trust() {
184        Ok(t) => t,
185        Err(_) => return Vec::new(),
186    };
187    let agents = match trust.get("agents").and_then(serde_json::Value::as_object) {
188        Some(a) => a.clone(),
189        None => return Vec::new(),
190    };
191    // Read relay_state once so the effective-tier lookup doesn't
192    // hammer the disk per peer. Missing file → empty peers map; the
193    // effective_tier helper handles that case fine.
194    let relay_state = read_relay_state().unwrap_or_else(|_| serde_json::json!({"peers": {}}));
195    let mut out: Vec<PendingPushPerPeer> = Vec::new();
196    for (peer_handle, _agent) in agents.iter() {
197        let pushed_ids = read_pushed_event_ids(peer_handle);
198        let outbox_path = match outbox_dir() {
199            Ok(d) => d.join(format!("{peer_handle}.jsonl")),
200            Err(_) => continue,
201        };
202        let body = match fs::read_to_string(&outbox_path) {
203            Ok(b) => b,
204            Err(_) => continue,
205        };
206        let mut count: u64 = 0;
207        for line in body.lines() {
208            if let Some(eid) = serde_json::from_str::<serde_json::Value>(line)
209                .ok()
210                .and_then(|v| {
211                    v.get("event_id")
212                        .and_then(serde_json::Value::as_str)
213                        .map(str::to_string)
214                })
215                && !pushed_ids.contains(&eid)
216            {
217                count += 1;
218            }
219        }
220        if count > 0 {
221            // Use effective tier (relay_state-aware) — daemon
222            // can't push to a peer with no slot_token even if
223            // trust.json says VERIFIED, and the PENDING_ACK hint
224            // is the actionable answer for that case.
225            let tier = crate::trust::effective_tier(&trust, &relay_state, peer_handle);
226            out.push(PendingPushPerPeer {
227                peer: peer_handle.clone(),
228                tier,
229                count,
230            });
231        }
232    }
233    // Stable, deterministic order — largest backlog first, peer name
234    // as tiebreak. JSON consumers + the human line both rely on it.
235    out.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.peer.cmp(&b.peer)));
236    out
237}
238
239/// Read `$WIRE_HOME/state/wire/stream_state.json` written by the
240/// daemon's SSE subscriber. `Value::Null` when the file is absent or
241/// unreadable — callers should treat that as "stream subscriber
242/// hasn't reported in yet" (cold start, or daemon predates #168).
243pub fn read_stream_state() -> serde_json::Value {
244    state_dir()
245        .ok()
246        .and_then(|d| fs::read_to_string(d.join("stream_state.json")).ok())
247        .and_then(|body| serde_json::from_str::<serde_json::Value>(&body).ok())
248        .unwrap_or(serde_json::Value::Null)
249}
250
251/// True when no sync has happened within the freshness window. None
252/// (= never synced here) is treated as stale. Shared between MCP +
253/// CLI so the boolean flips at the same moment in both surfaces.
254pub fn stale_sync(last_sync_age_seconds: Option<u64>) -> bool {
255    match last_sync_age_seconds {
256        Some(age) => age > 60,
257        None => true,
258    }
259}
260
261/// Read the set of event_ids already recorded as pushed for `peer`.
262/// Cheap (single file read + parse); callers that need bulk lifecycle
263/// data should read the file directly. Returns an empty set on
264/// missing/unreadable file.
265pub fn read_pushed_event_ids(peer: &str) -> std::collections::HashSet<String> {
266    let normalized = crate::agent_card::bare_handle(peer);
267    let path = match outbox_dir() {
268        Ok(d) => d.join(format!("{normalized}.pushed.jsonl")),
269        Err(_) => return std::collections::HashSet::new(),
270    };
271    let body = match fs::read_to_string(&path) {
272        Ok(b) => b,
273        Err(_) => return std::collections::HashSet::new(),
274    };
275    body.lines()
276        .filter_map(|line| {
277            serde_json::from_str::<serde_json::Value>(line)
278                .ok()?
279                .get("event_id")?
280                .as_str()
281                .map(str::to_string)
282        })
283        .collect()
284}
285
286pub fn append_outbox_record(peer: &str, record_bytes: &[u8]) -> Result<PathBuf> {
287    ensure_dirs()?;
288    let normalized = crate::agent_card::bare_handle(peer);
289    let path = outbox_dir()?.join(format!("{normalized}.jsonl"));
290    let lock = outbox_lock(&path);
291    let _g = lock.lock().expect("outbox per-path mutex poisoned");
292    let mut f = fs::OpenOptions::new()
293        .create(true)
294        .append(true)
295        .open(&path)
296        .with_context(|| format!("opening outbox {path:?}"))?;
297    let mut buf = Vec::with_capacity(record_bytes.len() + 1);
298    buf.extend_from_slice(record_bytes);
299    buf.push(b'\n');
300    f.write_all(&buf)
301        .with_context(|| format!("appending to {path:?}"))?;
302    Ok(path)
303}
304
305/// Whether `wire init` has already been run (private key + card both present).
306pub fn is_initialized() -> Result<bool> {
307    Ok(private_key_path()?.exists() && agent_card_path()?.exists())
308}
309
310/// Create directory tree with restrictive permissions on the config dir.
311pub fn ensure_dirs() -> Result<()> {
312    let cfg = config_dir()?;
313    fs::create_dir_all(&cfg).with_context(|| format!("creating {cfg:?}"))?;
314    fs::create_dir_all(state_dir()?)?;
315    fs::create_dir_all(inbox_dir()?)?;
316    fs::create_dir_all(outbox_dir()?)?;
317    set_dir_mode_0700(&cfg)?;
318    Ok(())
319}
320
321#[cfg(unix)]
322fn set_dir_mode_0700(path: &Path) -> Result<()> {
323    use std::os::unix::fs::PermissionsExt;
324    let mut perms = fs::metadata(path)?.permissions();
325    perms.set_mode(0o700);
326    fs::set_permissions(path, perms)?;
327    Ok(())
328}
329
330#[cfg(not(unix))]
331fn set_dir_mode_0700(_: &Path) -> Result<()> {
332    Ok(())
333}
334
335/// Write a private key file with mode 0600.
336pub fn write_private_key(seed: &[u8; 32]) -> Result<()> {
337    let path = private_key_path()?;
338    fs::write(&path, seed).with_context(|| format!("writing {path:?}"))?;
339    set_file_mode_0600(&path)?;
340    Ok(())
341}
342
343#[cfg(unix)]
344fn set_file_mode_0600(path: &Path) -> Result<()> {
345    use std::os::unix::fs::PermissionsExt;
346    let mut perms = fs::metadata(path)?.permissions();
347    perms.set_mode(0o600);
348    fs::set_permissions(path, perms)?;
349    Ok(())
350}
351
352#[cfg(not(unix))]
353fn set_file_mode_0600(_: &Path) -> Result<()> {
354    Ok(())
355}
356
357/// Read the saved private key seed (32 bytes).
358pub fn read_private_key() -> Result<[u8; 32]> {
359    let path = private_key_path()?;
360    let bytes = fs::read(&path).with_context(|| format!("reading {path:?}"))?;
361    if bytes.len() != 32 {
362        return Err(anyhow!(
363            "private key file has wrong length ({} != 32)",
364            bytes.len()
365        ));
366    }
367    let mut seed = [0u8; 32];
368    seed.copy_from_slice(&bytes);
369    Ok(seed)
370}
371
372// ── RFC-001 operator / organization key storage ───────────────────────────
373// Operator + org root private keys live alongside the session `private.key`,
374// same 0600 raw-32-byte-seed convention. These anchor the offline identity
375// layer's `op_did` / `org_did` (each DID commits to its key).
376
377pub fn op_key_path() -> Result<PathBuf> {
378    Ok(config_dir()?.join("op.key"))
379}
380
381/// Sanitize a DID into a safe filename component (DIDs carry `:`).
382fn did_filename(did: &str) -> String {
383    did.chars()
384        .map(|c| {
385            if c.is_ascii_alphanumeric() || c == '-' {
386                c
387            } else {
388                '_'
389            }
390        })
391        .collect()
392}
393
394pub fn org_key_path(org_did: &str) -> Result<PathBuf> {
395    Ok(config_dir()?
396        .join("orgs")
397        .join(format!("{}.key", did_filename(org_did))))
398}
399
400fn write_seed_0600(path: &Path, seed: &[u8; 32]) -> Result<()> {
401    if let Some(parent) = path.parent() {
402        fs::create_dir_all(parent)?;
403    }
404    fs::write(path, seed).with_context(|| format!("writing {path:?}"))?;
405    set_file_mode_0600(path)?;
406    Ok(())
407}
408
409fn read_seed(path: &Path) -> Result<[u8; 32]> {
410    let bytes = fs::read(path).with_context(|| format!("reading {path:?}"))?;
411    if bytes.len() != 32 {
412        return Err(anyhow!(
413            "key file {path:?} has wrong length ({} != 32)",
414            bytes.len()
415        ));
416    }
417    let mut seed = [0u8; 32];
418    seed.copy_from_slice(&bytes);
419    Ok(seed)
420}
421
422pub fn write_op_key(seed: &[u8; 32]) -> Result<()> {
423    write_seed_0600(&op_key_path()?, seed)
424}
425pub fn read_op_key() -> Result<[u8; 32]> {
426    read_seed(&op_key_path()?)
427}
428pub fn write_org_key(org_did: &str, seed: &[u8; 32]) -> Result<()> {
429    write_seed_0600(&org_key_path(org_did)?, seed)
430}
431pub fn read_org_key(org_did: &str) -> Result<[u8; 32]> {
432    read_seed(&org_key_path(org_did)?)
433}
434
435pub fn op_meta_path() -> Result<PathBuf> {
436    Ok(config_dir()?.join("op.json"))
437}
438
439/// Persist the operator handle chosen at `wire enroll op`. The op_did derives
440/// from handle + op key; card-emit re-derives it at card-build time.
441pub fn write_op_handle(handle: &str) -> Result<()> {
442    let path = op_meta_path()?;
443    if let Some(p) = path.parent() {
444        fs::create_dir_all(p)?;
445    }
446    fs::write(
447        &path,
448        serde_json::to_vec_pretty(&serde_json::json!({ "handle": handle }))?,
449    )?;
450    set_file_mode_0600(&path)?;
451    Ok(())
452}
453
454pub fn read_op_handle() -> Result<Option<String>> {
455    let Ok(bytes) = fs::read(op_meta_path()?) else {
456        return Ok(None);
457    };
458    let v: Value = serde_json::from_slice(&bytes)?;
459    Ok(v.get("handle").and_then(Value::as_str).map(str::to_string))
460}
461
462pub fn memberships_path() -> Result<PathBuf> {
463    Ok(config_dir()?.join("memberships.json"))
464}
465
466/// Append an org membership the operator holds (org_did / org_pubkey /
467/// member_cert) for card-emit to attach. Replaces any existing entry for the
468/// same org_did (re-issued certs supersede).
469pub fn add_membership(org_did: &str, org_pubkey: &str, member_cert: &str) -> Result<()> {
470    let mut list = read_memberships()?;
471    list.retain(|m| m.get("org_did").and_then(Value::as_str) != Some(org_did));
472    list.push(serde_json::json!({
473        "org_did": org_did, "org_pubkey": org_pubkey, "member_cert": member_cert
474    }));
475    let path = memberships_path()?;
476    if let Some(p) = path.parent() {
477        fs::create_dir_all(p)?;
478    }
479    fs::write(&path, serde_json::to_vec_pretty(&Value::Array(list))?)?;
480    Ok(())
481}
482
483/// Read the operator's stored org memberships (empty if none/malformed).
484pub fn read_memberships() -> Result<Vec<Value>> {
485    let Ok(bytes) = fs::read(memberships_path()?) else {
486        return Ok(vec![]);
487    };
488    Ok(serde_json::from_slice::<Value>(&bytes)
489        .ok()
490        .and_then(|v| v.as_array().cloned())
491        .unwrap_or_default())
492}
493
494pub fn write_agent_card(card: &Value) -> Result<()> {
495    let path = agent_card_path()?;
496    let body = serde_json::to_vec_pretty(card)?;
497    // v0.7.0-alpha.8 (review-fix #7): atomic write via tmp+rename so
498    // a power-loss / SIGKILL mid-write doesn't leave a 0-byte agent-
499    // card that `is_initialized()` claims is fine but `read_agent_card`
500    // can't parse. `cmd_identity_rename` made this a hot path; the
501    // pre-existing fs::write pattern was a corruption risk every call.
502    let tmp = path.with_extension("json.tmp");
503    fs::write(&tmp, body).with_context(|| format!("writing tmp {tmp:?}"))?;
504    fs::rename(&tmp, &path).with_context(|| format!("atomic rename {tmp:?} → {path:?}"))?;
505    Ok(())
506}
507
508pub fn read_agent_card() -> Result<Value> {
509    let path = agent_card_path()?;
510    let body = fs::read(&path).with_context(|| format!("reading {path:?}"))?;
511    Ok(serde_json::from_slice(&body)?)
512}
513
514// ---------- display overrides (v0.7.0-alpha.3) ----------
515
516/// Path to `display.json` — operator-chosen character nickname + emoji
517/// override. Sidecar to agent-card. NOT signed (display-only, local-only).
518///
519/// Format: `{"nickname": "foxtrot-meadow", "emoji": "🦊"}` — both fields
520/// optional, omitted means use the auto-derived value.
521pub fn display_overrides_path() -> Result<PathBuf> {
522    Ok(config_dir()?.join("display.json"))
523}
524
525#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
526pub struct DisplayOverrides {
527    #[serde(default, skip_serializing_if = "Option::is_none")]
528    pub nickname: Option<String>,
529    #[serde(default, skip_serializing_if = "Option::is_none")]
530    pub emoji: Option<String>,
531}
532
533pub fn read_display_overrides() -> Result<DisplayOverrides> {
534    read_display_overrides_at(&display_overrides_path()?)
535}
536
537pub fn read_display_overrides_at(path: &Path) -> Result<DisplayOverrides> {
538    if !path.exists() {
539        return Ok(DisplayOverrides::default());
540    }
541    let body = fs::read(path).with_context(|| format!("reading {path:?}"))?;
542    Ok(serde_json::from_slice(&body)?)
543}
544
545pub fn write_display_overrides(overrides: &DisplayOverrides) -> Result<()> {
546    let path = display_overrides_path()?;
547    if let Some(parent) = path.parent() {
548        fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
549    }
550    let body = serde_json::to_vec_pretty(overrides)?;
551    // v0.7.0-alpha.8 (review-fix #7): atomic write — consistent with
552    // write_agent_card now that they share the cmd_identity_rename
553    // call path.
554    let tmp = path.with_extension("json.tmp");
555    fs::write(&tmp, body).with_context(|| format!("writing tmp {tmp:?}"))?;
556    fs::rename(&tmp, &path).with_context(|| format!("atomic rename {tmp:?} → {path:?}"))?;
557    Ok(())
558}
559
560pub fn write_trust(trust: &Value) -> Result<()> {
561    let path = trust_path()?;
562    let body = serde_json::to_vec_pretty(trust)?;
563    fs::write(&path, body).with_context(|| format!("writing {path:?}"))?;
564    Ok(())
565}
566
567pub fn read_trust() -> Result<Value> {
568    let path = trust_path()?;
569    if !path.exists() {
570        return Ok(crate::trust::empty_trust());
571    }
572    let body = fs::read(&path).with_context(|| format!("reading {path:?}"))?;
573    Ok(serde_json::from_slice(&body)?)
574}
575
576// ---------- relay binding state ----------
577
578/// Path to `relay.json` — holds our own slot binding and pinned peer slots.
579/// Contains slot-tokens, so always written mode 0600.
580pub fn relay_state_path() -> Result<PathBuf> {
581    Ok(config_dir()?.join("relay.json"))
582}
583
584pub fn read_relay_state() -> Result<Value> {
585    let path = relay_state_path()?;
586    if !path.exists() {
587        return Ok(serde_json::json!({"self": Value::Null, "peers": {}}));
588    }
589    let body = fs::read(&path).with_context(|| format!("reading {path:?}"))?;
590    Ok(serde_json::from_slice(&body)?)
591}
592
593/// Atomic, lock-serialized write of the full relay-state. Every direct caller
594/// (foreground `wire dial`, the background daemon, MCP) funnels through here,
595/// so a foreground write can neither TEAR nor lost-update against the daemon.
596/// Holds the same `relay.lock` flock as [`update_relay_state`] and writes via
597/// tmp+rename.
598///
599/// Bug #3 (v0.13.2): the old raw `fs::write` here was non-atomic and lockless.
600/// A foreground `wire dial` and the daemon both rewrote `relay.json`
601/// concurrently, interleaving bytes and leaving trailing garbage ("trailing
602/// characters at line N") that made the file unparseable — breaking all
603/// push/pull until hand-repaired. Surfaced on Windows (file-sharing
604/// semantics make the interleave easy to hit) but the race was cross-platform.
605pub fn write_relay_state(state: &Value) -> Result<()> {
606    use fs2::FileExt;
607    let lock_path = relay_state_lock_path()?;
608    if let Some(parent) = lock_path.parent() {
609        fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
610    }
611    let lock_file = fs::OpenOptions::new()
612        .create(true)
613        .truncate(false)
614        .read(true)
615        .write(true)
616        .open(&lock_path)
617        .with_context(|| format!("opening {lock_path:?}"))?;
618    lock_file
619        .lock_exclusive()
620        .with_context(|| format!("flock {lock_path:?}"))?;
621    let r = write_relay_state_unlocked(state);
622    let _ = fs2::FileExt::unlock(&lock_file);
623    r
624}
625
626/// Atomic relay-state write WITHOUT taking `relay.lock` — the caller must
627/// already hold it (only [`update_relay_state`], which writes inside its own
628/// locked transaction). tmp+rename so a concurrent reader sees either the old
629/// or new whole file, never a partial one.
630fn write_relay_state_unlocked(state: &Value) -> Result<()> {
631    let path = relay_state_path()?;
632    let body = serde_json::to_vec_pretty(state)?;
633    let tmp = path.with_extension("json.tmp");
634    fs::write(&tmp, &body).with_context(|| format!("writing tmp {tmp:?}"))?;
635    set_file_mode_0600(&tmp)?;
636    fs::rename(&tmp, &path).with_context(|| format!("atomic rename {tmp:?} → {path:?}"))?;
637    Ok(())
638}
639
640/// Path to the flock file that serialises concurrent read-modify-write
641/// transactions against `relay.json`. Separate file because flock on the
642/// data file itself races with file replacement (fs::write truncates +
643/// rewrites — atomic-ish but the lock identity disappears).
644fn relay_state_lock_path() -> Result<PathBuf> {
645    Ok(config_dir()?.join("relay.lock"))
646}
647
648/// Atomic read-modify-write against `relay.json`. Holds an exclusive
649/// `fs2::FileExt::lock_exclusive` for the whole transaction so concurrent
650/// `wire` processes (multiple daemons, CLI vs daemon, CLI vs MCP) cannot
651/// race the cursor or peer-pin entries.
652///
653/// P0.3 (0.5.11). Today's debug had three concurrent `wire` processes
654/// (stale 0.2.4 daemon, fresh 0.5.10 daemon, and the CLI) racing the
655/// `self.last_pulled_event_id` cursor — one would advance it past an
656/// event, another would later rewind via stale snapshot. flock makes
657/// that impossible.
658///
659/// Lock timeout: blocks indefinitely (well-behaved processes release in
660/// < 1ms). Use sparingly outside short RMW windows — long holds will
661/// stall every other `wire` process.
662pub fn update_relay_state<F>(modifier: F) -> Result<()>
663where
664    F: FnOnce(&mut Value) -> Result<()>,
665{
666    use fs2::FileExt;
667    let lock_path = relay_state_lock_path()?;
668    if let Some(parent) = lock_path.parent() {
669        fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
670    }
671    // Open / create the lock file. Holding a handle keeps the file
672    // alive for the lifetime of the transaction.
673    let lock_file = fs::OpenOptions::new()
674        .create(true)
675        .truncate(false)
676        .read(true)
677        .write(true)
678        .open(&lock_path)
679        .with_context(|| format!("opening {lock_path:?}"))?;
680    lock_file
681        .lock_exclusive()
682        .with_context(|| format!("flock {lock_path:?}"))?;
683
684    // Read fresh state INSIDE the lock — any prior snapshot would be a
685    // race window. Then run the modifier. Then write atomically.
686    let mut state = read_relay_state()?;
687    let result = modifier(&mut state);
688    let write_result = if result.is_ok() {
689        // We already hold relay.lock — use the unlocked writer to avoid
690        // re-acquiring the same flock (which would deadlock).
691        write_relay_state_unlocked(&state)
692    } else {
693        Ok(())
694    };
695    // RAII: drop releases the lock. Explicit unlock for clarity + to
696    // ensure unlock happens even if Drop ordering ever changes.
697    let _ = fs2::FileExt::unlock(&lock_file);
698    result?;
699    write_result?;
700    Ok(())
701}
702
703/// Test-only helpers. Lives outside `tests` mod so other modules' tests
704/// can share the same WIRE_HOME isolation. Tests run in-process and share
705/// process-wide env state, so all WIRE_HOME mutators must use this lock or
706/// they race each other.
707#[cfg(test)]
708pub(crate) mod test_support {
709    use std::sync::Mutex;
710
711    pub static ENV_LOCK: Mutex<()> = Mutex::new(());
712
713    pub fn with_temp_home<F: FnOnce()>(f: F) {
714        // Recover from poison so one failing test doesn't cascade-fail the rest.
715        let _guard = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
716        let tmp = std::env::temp_dir().join(format!("wire-test-{}", rand::random::<u32>()));
717        // SAFETY: ENV_LOCK serializes all callers, so no concurrent env access.
718        unsafe { std::env::set_var("WIRE_HOME", &tmp) };
719        let _ = std::fs::remove_dir_all(&tmp);
720        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
721        unsafe { std::env::remove_var("WIRE_HOME") };
722        let _ = std::fs::remove_dir_all(&tmp);
723        if let Err(e) = result {
724            std::panic::resume_unwind(e);
725        }
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732    use serde_json::json;
733
734    #[test]
735    fn did_filename_sanitizes_did_punctuation() {
736        assert_eq!(
737            did_filename("did:wire:org:slanchaai-abc123"),
738            "did_wire_org_slanchaai-abc123"
739        );
740        // No path-traversal characters survive into the filename.
741        let f = did_filename("did:wire:org:x/../../etc");
742        assert!(!f.contains('/') && !f.contains('.'));
743    }
744
745    #[test]
746    fn op_and_org_key_roundtrip() {
747        with_temp_home(|| {
748            let op_seed = [7u8; 32];
749            write_op_key(&op_seed).unwrap();
750            assert_eq!(read_op_key().unwrap(), op_seed);
751
752            let org_did = "did:wire:org:slanchaai-deadbeef";
753            let org_seed = [9u8; 32];
754            write_org_key(org_did, &org_seed).unwrap();
755            assert_eq!(read_org_key(org_did).unwrap(), org_seed);
756        });
757    }
758
759    fn with_temp_home<F: FnOnce()>(f: F) {
760        super::test_support::with_temp_home(f)
761    }
762
763    #[test]
764    fn config_dir_honors_wire_home() {
765        with_temp_home(|| {
766            let dir = config_dir().unwrap();
767            assert!(dir.ends_with("wire"), "got {dir:?}");
768            assert!(dir.to_string_lossy().contains("wire-test-"));
769        });
770    }
771
772    #[test]
773    fn ensure_dirs_creates_layout() {
774        with_temp_home(|| {
775            ensure_dirs().unwrap();
776            assert!(config_dir().unwrap().is_dir());
777            assert!(state_dir().unwrap().is_dir());
778            assert!(inbox_dir().unwrap().is_dir());
779            assert!(outbox_dir().unwrap().is_dir());
780        });
781    }
782
783    #[test]
784    fn private_key_roundtrip() {
785        with_temp_home(|| {
786            ensure_dirs().unwrap();
787            let seed = [42u8; 32];
788            write_private_key(&seed).unwrap();
789            let read_back = read_private_key().unwrap();
790            assert_eq!(seed, read_back);
791        });
792    }
793
794    #[test]
795    fn agent_card_roundtrip() {
796        with_temp_home(|| {
797            ensure_dirs().unwrap();
798            let card = json!({"did": "did:wire:paul", "name": "Paul"});
799            write_agent_card(&card).unwrap();
800            let read_back = read_agent_card().unwrap();
801            assert_eq!(card, read_back);
802        });
803    }
804
805    #[test]
806    fn trust_returns_empty_when_missing() {
807        with_temp_home(|| {
808            ensure_dirs().unwrap();
809            let t = read_trust().unwrap();
810            assert_eq!(t["version"], 1);
811            assert!(t["agents"].is_object());
812        });
813    }
814
815    #[test]
816    fn update_relay_state_writes_through_lock() {
817        // P0.3 smoke: update_relay_state runs the modifier and persists the
818        // result. Doesn't exercise concurrent flock contention (that needs
819        // multi-process orchestration; deferred to an e2e test) but at least
820        // proves the happy path works end-to-end through the new lock
821        // wrapper.
822        with_temp_home(|| {
823            ensure_dirs().unwrap();
824            // Seed initial state.
825            let initial = json!({"self": null, "peers": {}});
826            write_relay_state(&initial).unwrap();
827            // Run an update.
828            super::update_relay_state(|state| {
829                state["self"] = json!({
830                    "relay_url": "https://test",
831                    "slot_id": "abc",
832                    "slot_token": "tok",
833                });
834                Ok(())
835            })
836            .unwrap();
837            // Verify persisted.
838            let after = read_relay_state().unwrap();
839            assert_eq!(after["self"]["relay_url"], "https://test");
840            assert_eq!(after["self"]["slot_id"], "abc");
841        });
842    }
843
844    #[test]
845    fn write_relay_state_never_tears_under_concurrency() {
846        // Bug #3 regression: many writers hammering relay.json with
847        // alternating long/short bodies. With the old raw fs::write a
848        // concurrent reader caught torn bytes ("trailing characters") and
849        // failed to parse. The atomic tmp+rename + flock must guarantee every
850        // read sees a complete, parseable file. (Threads share one process +
851        // WIRE_HOME; the flock serializes them just as it would processes.)
852        with_temp_home(|| {
853            ensure_dirs().unwrap();
854            write_relay_state(&json!({"self": null, "peers": {}})).unwrap();
855            let handles: Vec<_> = (0..8)
856                .map(|w| {
857                    std::thread::spawn(move || {
858                        for j in 0..25 {
859                            let body = if j % 2 == 0 {
860                                json!({"self": {"w": w, "j": j, "pad": "x".repeat(2048)}})
861                            } else {
862                                json!({"self": {"w": w}})
863                            };
864                            write_relay_state(&body).unwrap();
865                            // Reader must ALWAYS parse — never a torn file.
866                            read_relay_state().expect("relay.json must always parse");
867                        }
868                    })
869                })
870                .collect();
871            for h in handles {
872                h.join().unwrap();
873            }
874            assert!(read_relay_state().unwrap().get("self").is_some());
875        });
876    }
877
878    #[test]
879    fn update_relay_state_modifier_error_does_not_clobber() {
880        // P0.3 contract: if the modifier returns Err, the state on disk
881        // must NOT be overwritten — partial work shouldn't half-land. The
882        // operator's prior state should survive the failed RMW.
883        with_temp_home(|| {
884            ensure_dirs().unwrap();
885            let initial = json!({"self": {"relay_url": "https://prior"}, "peers": {}});
886            write_relay_state(&initial).unwrap();
887            let result = super::update_relay_state(|state| {
888                // Trash the state mid-modifier...
889                state["self"] = json!({"relay_url": "https://NEVER_PERSIST"});
890                // ...then fail. Write must NOT happen.
891                anyhow::bail!("simulated mid-RMW error")
892            });
893            assert!(result.is_err());
894            let after = read_relay_state().unwrap();
895            assert_eq!(
896                after["self"]["relay_url"], "https://prior",
897                "state on disk must not reflect aborted modifier"
898            );
899        });
900    }
901
902    #[test]
903    fn is_initialized_true_only_after_both_files_written() {
904        with_temp_home(|| {
905            ensure_dirs().unwrap();
906            assert!(!is_initialized().unwrap());
907            write_private_key(&[0u8; 32]).unwrap();
908            assert!(!is_initialized().unwrap()); // card still missing
909            write_agent_card(&json!({"did": "did:wire:paul"})).unwrap();
910            assert!(is_initialized().unwrap());
911        });
912    }
913
914    #[cfg(unix)]
915    #[test]
916    fn append_outbox_record_normalizes_fqdn_to_bare_handle() {
917        // Regression for issue #2 (v0.5.11 silent-fail): if a caller
918        // passes the FQDN form (`bob@relay.example`), the file MUST
919        // still land at `bob.jsonl` so `wire push` enumerates it.
920        with_temp_home(|| {
921            let path_fqdn = append_outbox_record("bob@wireup.net", b"{\"kind\":1100}").unwrap();
922            let path_bare = append_outbox_record("bob", b"{\"kind\":1100}").unwrap();
923            // Both calls must land in the SAME file — the bare handle one.
924            assert_eq!(path_fqdn, path_bare, "FQDN form should normalize to bare");
925            assert!(
926                path_fqdn.file_name().unwrap().to_string_lossy() == "bob.jsonl",
927                "expected bob.jsonl, got {path_fqdn:?}"
928            );
929            // And the FQDN-named file MUST NOT exist.
930            let outbox = outbox_dir().unwrap();
931            assert!(
932                !outbox.join("bob@wireup.net.jsonl").exists(),
933                "FQDN-named file must not be created"
934            );
935            // The bare file should have BOTH writes.
936            let body = std::fs::read_to_string(&path_bare).unwrap();
937            assert_eq!(body.matches("kind").count(), 2, "got: {body}");
938        });
939    }
940
941    #[test]
942    fn pending_push_breakdown_attributes_per_peer_with_tier() {
943        with_temp_home(|| {
944            ensure_dirs().unwrap();
945            // Seed trust.json with three peers at different tiers.
946            let trust = json!({
947                "agents": {
948                    "alpha-fox":   {"tier": "VERIFIED"},
949                    "beta-newt":   {"tier": "PENDING_ACK"},
950                    "gamma-otter": {"tier": "UNTRUSTED"},
951                }
952            });
953            write_trust(&trust).unwrap();
954            // Seed relay.json so alpha (VERIFIED) has
955            // bilateral_completed_at set → effective tier stays
956            // VERIFIED. Without this, effective_tier would
957            // demote alpha to PENDING_ACK (no slot_token) and the
958            // fixture would mislead about what's tested.
959            let relay = json!({
960                "self": null,
961                "peers": {
962                    "alpha-fox": {
963                        "bilateral_completed_at": "2026-06-01T00:00:00Z"
964                    }
965                }
966            });
967            write_relay_state(&relay).unwrap();
968            // Seed per-peer outboxes: alpha has 2 events, 1 pushed
969            // (1 unpushed). beta has 3 events, 0 pushed. gamma has
970            // 0 events. The breakdown should:
971            // - include alpha with count=1 tier=VERIFIED
972            // - include beta with count=3 tier=PENDING_ACK
973            // - NOT include gamma (count=0)
974            // - sort largest backlog first → beta then alpha
975            let out = outbox_dir().unwrap();
976            std::fs::write(
977                out.join("alpha-fox.jsonl"),
978                "{\"event_id\":\"a1\"}\n{\"event_id\":\"a2\"}\n",
979            )
980            .unwrap();
981            std::fs::write(
982                out.join("alpha-fox.pushed.jsonl"),
983                "{\"event_id\":\"a1\"}\n",
984            )
985            .unwrap();
986            std::fs::write(
987                out.join("beta-newt.jsonl"),
988                "{\"event_id\":\"b1\"}\n{\"event_id\":\"b2\"}\n{\"event_id\":\"b3\"}\n",
989            )
990            .unwrap();
991            let bd = compute_pending_push_breakdown();
992            assert_eq!(bd.len(), 2, "got: {bd:?}");
993            assert_eq!(bd[0].peer, "beta-newt");
994            assert_eq!(bd[0].tier, "PENDING_ACK");
995            assert_eq!(bd[0].count, 3);
996            assert_eq!(bd[1].peer, "alpha-fox");
997            assert_eq!(bd[1].tier, "VERIFIED");
998            assert_eq!(bd[1].count, 1);
999            // Aggregate wrapper still matches.
1000            assert_eq!(compute_pending_push_count(), 4);
1001        });
1002    }
1003
1004    #[test]
1005    fn private_key_is_mode_0600() {
1006        use std::os::unix::fs::PermissionsExt;
1007        with_temp_home(|| {
1008            ensure_dirs().unwrap();
1009            write_private_key(&[1u8; 32]).unwrap();
1010            let mode = fs::metadata(private_key_path().unwrap())
1011                .unwrap()
1012                .permissions()
1013                .mode();
1014            assert_eq!(mode & 0o777, 0o600, "got {:o}", mode & 0o777);
1015        });
1016    }
1017}