Skip to main content

rz_cli/
registry.rs

1//! Agent registry for discovery and routing.
2//!
3//! Persists agent entries to `~/.rz/registry.json` so any process
4//! can discover peers by name, transport, or capability.
5
6use eyre::{Context, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fs;
10use std::path::PathBuf;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// A single registered agent.
14#[derive(Serialize, Deserialize, Clone, Debug)]
15pub struct AgentEntry {
16    pub name: String,
17    /// UUID or cmux surface ID.
18    pub id: String,
19    /// One of: `cmux`, `http`, `file`, `nats`.
20    pub transport: String,
21    /// Surface ID for cmux, URL for http, agent name for nats.
22    pub endpoint: String,
23    /// Optional tags like `["code","review","search"]`.
24    #[serde(default)]
25    pub capabilities: Vec<String>,
26    /// If true, entry persists in registry after agent exits.
27    #[serde(default)]
28    pub permanent: bool,
29    /// Unix epoch milliseconds when the agent first registered.
30    pub registered_at: u64,
31    /// Unix epoch milliseconds, updated by heartbeat.
32    pub last_seen: u64,
33}
34
35/// Max inactivity before a non-permanent agent is pruned (10 minutes).
36const STALE_THRESHOLD_MS: u64 = 10 * 60 * 1000;
37
38/// Return the path to the registry file (`~/.rz/registry.json`).
39pub fn registry_path() -> PathBuf {
40    let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
41    PathBuf::from(home).join(".rz").join("registry.json")
42}
43
44/// Load the registry from disk. Returns an empty map if the file does not exist.
45pub fn load() -> Result<HashMap<String, AgentEntry>> {
46    let path = registry_path();
47    if !path.exists() {
48        return Ok(HashMap::new());
49    }
50    let data = fs::read_to_string(&path)
51        .wrap_err_with(|| format!("failed to read {}", path.display()))?;
52    let map: HashMap<String, AgentEntry> =
53        serde_json::from_str(&data).wrap_err("failed to parse registry.json")?;
54    Ok(map)
55}
56
57/// Atomically write the registry to disk (write-tmp then rename).
58pub fn save(registry: &HashMap<String, AgentEntry>) -> Result<()> {
59    let path = registry_path();
60    if let Some(parent) = path.parent() {
61        fs::create_dir_all(parent)
62            .wrap_err_with(|| format!("failed to create {}", parent.display()))?;
63    }
64    let json = serde_json::to_string_pretty(registry)
65        .wrap_err("failed to serialize registry")?;
66
67    // Atomic write: temp file in same dir, then rename.
68    let tmp = path.with_extension("json.tmp");
69    fs::write(&tmp, json.as_bytes())
70        .wrap_err_with(|| format!("failed to write {}", tmp.display()))?;
71    fs::rename(&tmp, &path)
72        .wrap_err_with(|| format!("failed to rename {} -> {}", tmp.display(), path.display()))?;
73    Ok(())
74}
75
76/// Register (or update) an agent entry.
77pub fn register(entry: AgentEntry) -> Result<()> {
78    let mut reg = load()?;
79    reg.insert(entry.name.clone(), entry);
80    save(&reg)
81}
82
83/// Remove an agent by name.
84pub fn deregister(name: &str) -> Result<()> {
85    let mut reg = load()?;
86    reg.remove(name);
87    save(&reg)
88}
89
90/// Look up an agent by name.
91pub fn lookup(name: &str) -> Result<Option<AgentEntry>> {
92    let reg = load()?;
93    Ok(reg.get(name).cloned())
94}
95
96/// Return all registered agents.
97pub fn list_all() -> Result<Vec<AgentEntry>> {
98    let reg = load()?;
99    Ok(reg.into_values().collect())
100}
101
102/// Remove entries whose `last_seen` is older than `max_age_secs` seconds ago.
103/// Returns the number of entries removed.
104pub fn cleanup_stale(max_age_secs: u64) -> Result<usize> {
105    let mut reg = load()?;
106    let now_ms = SystemTime::now()
107        .duration_since(UNIX_EPOCH)
108        .unwrap_or_default()
109        .as_millis() as u64;
110    let cutoff = now_ms.saturating_sub(max_age_secs * 1000);
111
112    let before = reg.len();
113    reg.retain(|_, entry| entry.last_seen >= cutoff);
114    let removed = before - reg.len();
115
116    if removed > 0 {
117        save(&reg)?;
118    }
119    Ok(removed)
120}
121
122/// Update `last_seen` to now for the given agent name.
123pub fn touch(name: &str) -> Result<()> {
124    let mut reg = load()?;
125    if let Some(entry) = reg.get_mut(name) {
126        entry.last_seen = SystemTime::now()
127            .duration_since(UNIX_EPOCH)
128            .unwrap_or_default()
129            .as_millis() as u64;
130        save(&reg)?;
131    }
132    Ok(())
133}
134
135// ---------------------------------------------------------------------------
136// NATS KV-backed registry
137// ---------------------------------------------------------------------------
138
139const NATS_KV_BUCKET: &str = "rz-agents";
140
141/// Get or create the NATS KV store for agent registration.
142async fn nats_kv_store(
143    js: &async_nats::jetstream::Context,
144) -> Result<async_nats::jetstream::kv::Store> {
145    match js.get_key_value(NATS_KV_BUCKET).await {
146        Ok(store) => Ok(store),
147        Err(_) => {
148            let store = js
149                .create_key_value(async_nats::jetstream::kv::Config {
150                    bucket: NATS_KV_BUCKET.to_string(),
151                    max_age: std::time::Duration::ZERO, // no auto-expiry — managed by heartbeat + pruning
152                    ..Default::default()
153                })
154                .await
155                .map_err(|e| eyre::eyre!("NATS KV create bucket failed: {e}"))?;
156            Ok(store)
157        }
158    }
159}
160
161/// Register an agent in the NATS KV bucket `rz-agents`.
162/// If `RZ_HUB` is not set, returns `Ok(())` silently.
163pub fn nats_register(entry: &AgentEntry) -> Result<()> {
164    let url = match crate::nats_hub::hub_url() {
165        Some(u) => u,
166        None => return Ok(()),
167    };
168    let json = serde_json::to_string(entry)?;
169
170    let rt = tokio::runtime::Builder::new_current_thread()
171        .enable_all()
172        .build()?;
173    rt.block_on(async {
174        let client = async_nats::connect(&url)
175            .await
176            .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
177        let js = async_nats::jetstream::new(client);
178        let store = nats_kv_store(&js).await?;
179        store
180            .put(&entry.name, json.into_bytes().into())
181            .await
182            .map_err(|e| eyre::eyre!("NATS KV put failed: {e}"))?;
183        Ok::<(), eyre::Report>(())
184    })?;
185    Ok(())
186}
187
188/// Remove an agent from the NATS KV bucket `rz-agents`.
189/// If `RZ_HUB` is not set, returns `Ok(())` silently.
190pub fn nats_deregister(name: &str) -> Result<()> {
191    let url = match crate::nats_hub::hub_url() {
192        Some(u) => u,
193        None => return Ok(()),
194    };
195
196    let rt = tokio::runtime::Builder::new_current_thread()
197        .enable_all()
198        .build()?;
199    rt.block_on(async {
200        let client = async_nats::connect(&url)
201            .await
202            .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
203        let js = async_nats::jetstream::new(client);
204        let store = nats_kv_store(&js).await?;
205        // Delete returns an error if key doesn't exist; ignore it.
206        let _ = store.delete(name).await;
207        Ok::<(), eyre::Report>(())
208    })?;
209    Ok(())
210}
211
212/// List all agents from the NATS KV bucket `rz-agents`.
213/// If `RZ_HUB` is not set or bucket doesn't exist, returns an empty vec.
214pub fn nats_list() -> Result<Vec<AgentEntry>> {
215    let url = match crate::nats_hub::hub_url() {
216        Some(u) => u,
217        None => return Ok(Vec::new()),
218    };
219
220    let rt = tokio::runtime::Builder::new_current_thread()
221        .enable_all()
222        .build()?;
223    rt.block_on(async {
224        let client = async_nats::connect(&url)
225            .await
226            .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
227        let js = async_nats::jetstream::new(client);
228        let store = match js.get_key_value(NATS_KV_BUCKET).await {
229            Ok(s) => s,
230            Err(_) => return Ok(Vec::new()), // bucket doesn't exist
231        };
232
233        use futures::StreamExt;
234        let mut keys = match store.keys().await {
235            Ok(k) => k,
236            Err(_) => return Ok(Vec::new()),
237        };
238
239        let now_ms = SystemTime::now()
240            .duration_since(UNIX_EPOCH)
241            .unwrap_or_default()
242            .as_millis() as u64;
243
244        let mut entries = Vec::new();
245        let mut to_prune = Vec::new();
246        while let Some(key) = keys.next().await {
247            let key = match key {
248                Ok(k) => k,
249                Err(_) => continue,
250            };
251            if let Ok(Some(kv_entry)) = store.get(&key).await {
252                if let Ok(agent) =
253                    serde_json::from_slice::<AgentEntry>(&kv_entry)
254                {
255                    // Prune stale non-permanent agents.
256                    if !agent.permanent && now_ms.saturating_sub(agent.last_seen) > STALE_THRESHOLD_MS {
257                        to_prune.push(agent.name.clone());
258                        continue;
259                    }
260                    entries.push(agent);
261                }
262            }
263        }
264        // Clean up stale entries.
265        for name in &to_prune {
266            let _ = store.delete(name).await;
267        }
268        Ok(entries)
269    })
270}
271
272/// Look up a single agent by name from the NATS KV bucket.
273/// If `RZ_HUB` is not set or key not found, returns `None`.
274pub fn nats_lookup(name: &str) -> Result<Option<AgentEntry>> {
275    let url = match crate::nats_hub::hub_url() {
276        Some(u) => u,
277        None => return Ok(None),
278    };
279
280    let rt = tokio::runtime::Builder::new_current_thread()
281        .enable_all()
282        .build()?;
283    rt.block_on(async {
284        let client = async_nats::connect(&url)
285            .await
286            .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
287        let js = async_nats::jetstream::new(client);
288        let store = match js.get_key_value(NATS_KV_BUCKET).await {
289            Ok(s) => s,
290            Err(_) => return Ok(None),
291        };
292        match store.get(name).await {
293            Ok(Some(value)) => {
294                let agent = serde_json::from_slice::<AgentEntry>(&value)
295                    .map_err(|e| eyre::eyre!("NATS KV deserialize failed: {e}"))?;
296                // Prune stale non-permanent agents.
297                if !agent.permanent {
298                    let now_ms = SystemTime::now()
299                        .duration_since(UNIX_EPOCH)
300                        .unwrap_or_default()
301                        .as_millis() as u64;
302                    if now_ms.saturating_sub(agent.last_seen) > STALE_THRESHOLD_MS {
303                        let _ = store.delete(name).await;
304                        return Ok(None);
305                    }
306                }
307                Ok(Some(agent))
308            }
309            Ok(None) | Err(_) => Ok(None),
310        }
311    })
312}
313
314/// Re-put an agent entry with updated `last_seen` to keep it alive.
315/// If `RZ_HUB` is not set, returns `Ok(())` silently.
316pub fn nats_heartbeat(name: &str, entry: &AgentEntry) -> Result<()> {
317    let url = match crate::nats_hub::hub_url() {
318        Some(u) => u,
319        None => return Ok(()),
320    };
321    let mut refreshed = entry.clone();
322    refreshed.last_seen = SystemTime::now()
323        .duration_since(UNIX_EPOCH)
324        .unwrap_or_default()
325        .as_millis() as u64;
326    let json = serde_json::to_string(&refreshed)?;
327
328    let rt = tokio::runtime::Builder::new_current_thread()
329        .enable_all()
330        .build()?;
331    rt.block_on(async {
332        let client = async_nats::connect(&url)
333            .await
334            .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
335        let js = async_nats::jetstream::new(client);
336        let store = nats_kv_store(&js).await?;
337        store
338            .put(name, json.into_bytes().into())
339            .await
340            .map_err(|e| eyre::eyre!("NATS KV heartbeat put failed: {e}"))?;
341        Ok::<(), eyre::Report>(())
342    })?;
343    Ok(())
344}