1use eyre::{Context, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fs;
10use std::path::PathBuf;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Serialize, Deserialize, Clone, Debug)]
15pub struct AgentEntry {
16 pub name: String,
17 pub id: String,
19 pub transport: String,
21 pub endpoint: String,
23 #[serde(default)]
25 pub capabilities: Vec<String>,
26 #[serde(default)]
28 pub permanent: bool,
29 pub registered_at: u64,
31 pub last_seen: u64,
33}
34
35const STALE_THRESHOLD_MS: u64 = 10 * 60 * 1000;
37
38pub fn registry_path() -> PathBuf {
40 let home = std::env::var("HOME").unwrap_or_else(|_| ".".into());
41 PathBuf::from(home).join(".rz").join("registry.json")
42}
43
44pub fn load() -> Result<HashMap<String, AgentEntry>> {
46 let path = registry_path();
47 if !path.exists() {
48 return Ok(HashMap::new());
49 }
50 let data = fs::read_to_string(&path)
51 .wrap_err_with(|| format!("failed to read {}", path.display()))?;
52 let map: HashMap<String, AgentEntry> =
53 serde_json::from_str(&data).wrap_err("failed to parse registry.json")?;
54 Ok(map)
55}
56
57pub fn save(registry: &HashMap<String, AgentEntry>) -> Result<()> {
59 let path = registry_path();
60 if let Some(parent) = path.parent() {
61 fs::create_dir_all(parent)
62 .wrap_err_with(|| format!("failed to create {}", parent.display()))?;
63 }
64 let json = serde_json::to_string_pretty(registry)
65 .wrap_err("failed to serialize registry")?;
66
67 let tmp = path.with_extension("json.tmp");
69 fs::write(&tmp, json.as_bytes())
70 .wrap_err_with(|| format!("failed to write {}", tmp.display()))?;
71 fs::rename(&tmp, &path)
72 .wrap_err_with(|| format!("failed to rename {} -> {}", tmp.display(), path.display()))?;
73 Ok(())
74}
75
76pub fn register(entry: AgentEntry) -> Result<()> {
78 let mut reg = load()?;
79 reg.insert(entry.name.clone(), entry);
80 save(®)
81}
82
83pub fn deregister(name: &str) -> Result<()> {
85 let mut reg = load()?;
86 reg.remove(name);
87 save(®)
88}
89
90pub fn lookup(name: &str) -> Result<Option<AgentEntry>> {
92 let reg = load()?;
93 Ok(reg.get(name).cloned())
94}
95
96pub fn list_all() -> Result<Vec<AgentEntry>> {
98 let reg = load()?;
99 Ok(reg.into_values().collect())
100}
101
102pub fn cleanup_stale(max_age_secs: u64) -> Result<usize> {
105 let mut reg = load()?;
106 let now_ms = SystemTime::now()
107 .duration_since(UNIX_EPOCH)
108 .unwrap_or_default()
109 .as_millis() as u64;
110 let cutoff = now_ms.saturating_sub(max_age_secs * 1000);
111
112 let before = reg.len();
113 reg.retain(|_, entry| entry.last_seen >= cutoff);
114 let removed = before - reg.len();
115
116 if removed > 0 {
117 save(®)?;
118 }
119 Ok(removed)
120}
121
122pub fn touch(name: &str) -> Result<()> {
124 let mut reg = load()?;
125 if let Some(entry) = reg.get_mut(name) {
126 entry.last_seen = SystemTime::now()
127 .duration_since(UNIX_EPOCH)
128 .unwrap_or_default()
129 .as_millis() as u64;
130 save(®)?;
131 }
132 Ok(())
133}
134
135const NATS_KV_BUCKET: &str = "rz-agents";
140
141async fn nats_kv_store(
143 js: &async_nats::jetstream::Context,
144) -> Result<async_nats::jetstream::kv::Store> {
145 match js.get_key_value(NATS_KV_BUCKET).await {
146 Ok(store) => Ok(store),
147 Err(_) => {
148 let store = js
149 .create_key_value(async_nats::jetstream::kv::Config {
150 bucket: NATS_KV_BUCKET.to_string(),
151 max_age: std::time::Duration::ZERO, ..Default::default()
153 })
154 .await
155 .map_err(|e| eyre::eyre!("NATS KV create bucket failed: {e}"))?;
156 Ok(store)
157 }
158 }
159}
160
161pub fn nats_register(entry: &AgentEntry) -> Result<()> {
164 let url = match crate::nats_hub::hub_url() {
165 Some(u) => u,
166 None => return Ok(()),
167 };
168 let json = serde_json::to_string(entry)?;
169
170 let rt = tokio::runtime::Builder::new_current_thread()
171 .enable_all()
172 .build()?;
173 rt.block_on(async {
174 let client = async_nats::connect(&url)
175 .await
176 .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
177 let js = async_nats::jetstream::new(client);
178 let store = nats_kv_store(&js).await?;
179 store
180 .put(&entry.name, json.into_bytes().into())
181 .await
182 .map_err(|e| eyre::eyre!("NATS KV put failed: {e}"))?;
183 Ok::<(), eyre::Report>(())
184 })?;
185 Ok(())
186}
187
188pub fn nats_deregister(name: &str) -> Result<()> {
191 let url = match crate::nats_hub::hub_url() {
192 Some(u) => u,
193 None => return Ok(()),
194 };
195
196 let rt = tokio::runtime::Builder::new_current_thread()
197 .enable_all()
198 .build()?;
199 rt.block_on(async {
200 let client = async_nats::connect(&url)
201 .await
202 .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
203 let js = async_nats::jetstream::new(client);
204 let store = nats_kv_store(&js).await?;
205 let _ = store.delete(name).await;
207 Ok::<(), eyre::Report>(())
208 })?;
209 Ok(())
210}
211
212pub fn nats_list() -> Result<Vec<AgentEntry>> {
215 let url = match crate::nats_hub::hub_url() {
216 Some(u) => u,
217 None => return Ok(Vec::new()),
218 };
219
220 let rt = tokio::runtime::Builder::new_current_thread()
221 .enable_all()
222 .build()?;
223 rt.block_on(async {
224 let client = async_nats::connect(&url)
225 .await
226 .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
227 let js = async_nats::jetstream::new(client);
228 let store = match js.get_key_value(NATS_KV_BUCKET).await {
229 Ok(s) => s,
230 Err(_) => return Ok(Vec::new()), };
232
233 use futures::StreamExt;
234 let mut keys = match store.keys().await {
235 Ok(k) => k,
236 Err(_) => return Ok(Vec::new()),
237 };
238
239 let now_ms = SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .unwrap_or_default()
242 .as_millis() as u64;
243
244 let mut entries = Vec::new();
245 let mut to_prune = Vec::new();
246 while let Some(key) = keys.next().await {
247 let key = match key {
248 Ok(k) => k,
249 Err(_) => continue,
250 };
251 if let Ok(Some(kv_entry)) = store.get(&key).await {
252 if let Ok(agent) =
253 serde_json::from_slice::<AgentEntry>(&kv_entry)
254 {
255 if !agent.permanent && now_ms.saturating_sub(agent.last_seen) > STALE_THRESHOLD_MS {
257 to_prune.push(agent.name.clone());
258 continue;
259 }
260 entries.push(agent);
261 }
262 }
263 }
264 for name in &to_prune {
266 let _ = store.delete(name).await;
267 }
268 Ok(entries)
269 })
270}
271
272pub fn nats_lookup(name: &str) -> Result<Option<AgentEntry>> {
275 let url = match crate::nats_hub::hub_url() {
276 Some(u) => u,
277 None => return Ok(None),
278 };
279
280 let rt = tokio::runtime::Builder::new_current_thread()
281 .enable_all()
282 .build()?;
283 rt.block_on(async {
284 let client = async_nats::connect(&url)
285 .await
286 .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
287 let js = async_nats::jetstream::new(client);
288 let store = match js.get_key_value(NATS_KV_BUCKET).await {
289 Ok(s) => s,
290 Err(_) => return Ok(None),
291 };
292 match store.get(name).await {
293 Ok(Some(value)) => {
294 let agent = serde_json::from_slice::<AgentEntry>(&value)
295 .map_err(|e| eyre::eyre!("NATS KV deserialize failed: {e}"))?;
296 if !agent.permanent {
298 let now_ms = SystemTime::now()
299 .duration_since(UNIX_EPOCH)
300 .unwrap_or_default()
301 .as_millis() as u64;
302 if now_ms.saturating_sub(agent.last_seen) > STALE_THRESHOLD_MS {
303 let _ = store.delete(name).await;
304 return Ok(None);
305 }
306 }
307 Ok(Some(agent))
308 }
309 Ok(None) | Err(_) => Ok(None),
310 }
311 })
312}
313
314pub fn nats_heartbeat(name: &str, entry: &AgentEntry) -> Result<()> {
317 let url = match crate::nats_hub::hub_url() {
318 Some(u) => u,
319 None => return Ok(()),
320 };
321 let mut refreshed = entry.clone();
322 refreshed.last_seen = SystemTime::now()
323 .duration_since(UNIX_EPOCH)
324 .unwrap_or_default()
325 .as_millis() as u64;
326 let json = serde_json::to_string(&refreshed)?;
327
328 let rt = tokio::runtime::Builder::new_current_thread()
329 .enable_all()
330 .build()?;
331 rt.block_on(async {
332 let client = async_nats::connect(&url)
333 .await
334 .map_err(|e| eyre::eyre!("NATS connect failed: {e}"))?;
335 let js = async_nats::jetstream::new(client);
336 let store = nats_kv_store(&js).await?;
337 store
338 .put(name, json.into_bytes().into())
339 .await
340 .map_err(|e| eyre::eyre!("NATS KV heartbeat put failed: {e}"))?;
341 Ok::<(), eyre::Report>(())
342 })?;
343 Ok(())
344}