1use std::io::{BufRead, BufReader, Read};
7use std::time::Duration;
8
9use hickory_proto::rr::RecordType;
10use koi_common::net::resolve_localhost;
11use koi_common::types::ServiceRecord;
12use koi_health::ServiceCheckKind;
13use koi_mdns::protocol::{
14 AdminRegistration, DaemonStatus, RegisterPayload, RegistrationResult, RenewalResult,
15};
16
17const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19
20const READ_TIMEOUT: Duration = Duration::from_secs(10);
22
23const HEALTH_TIMEOUT: Duration = Duration::from_millis(200);
25
26#[derive(Debug, thiserror::Error)]
29pub enum ClientError {
30 #[error("Daemon not reachable: {0}")]
31 Unreachable(String),
32
33 #[error("{error}: {message}")]
34 Api { error: String, message: String },
35
36 #[error("Request failed: {0}")]
37 Transport(String),
38
39 #[error("Invalid response: {0}")]
40 Decode(String),
41}
42
43pub type Result<T> = std::result::Result<T, ClientError>;
44
45const DAT_HEADER: &str = "X-Koi-Token";
49
50pub struct KoiClient {
51 endpoint: String,
52 agent: ureq::Agent,
53 token: String,
55}
56
57impl KoiClient {
58 pub fn new(endpoint: &str) -> Self {
59 let clean = endpoint.trim_end_matches('/');
60 let resolved = resolve_localhost(clean);
61 let agent = ureq::AgentBuilder::new()
62 .timeout_connect(CONNECT_TIMEOUT)
63 .timeout_read(READ_TIMEOUT)
64 .build();
65 Self {
66 endpoint: resolved,
67 agent,
68 token: String::new(),
69 }
70 }
71
72 pub fn with_token(endpoint: &str, token: &str) -> Self {
74 let mut client = Self::new(endpoint);
75 client.token = token.to_string();
76 client
77 }
78
79 pub fn from_breadcrumb() -> Option<Self> {
83 let bc = koi_config::breadcrumb::read_breadcrumb()?;
84 Some(Self::with_token(&bc.endpoint, &bc.token))
85 }
86
87 fn auth_get(&self, url: &str) -> ureq::Request {
89 let req = self.agent.get(url);
90 if self.token.is_empty() {
91 req
92 } else {
93 req.set(DAT_HEADER, &self.token)
94 }
95 }
96
97 fn auth_post(&self, url: &str) -> ureq::Request {
99 let req = self.agent.post(url);
100 if self.token.is_empty() {
101 req
102 } else {
103 req.set(DAT_HEADER, &self.token)
104 }
105 }
106
107 fn auth_put(&self, url: &str) -> ureq::Request {
109 let req = self.agent.put(url);
110 if self.token.is_empty() {
111 req
112 } else {
113 req.set(DAT_HEADER, &self.token)
114 }
115 }
116
117 fn auth_delete(&self, url: &str) -> ureq::Request {
119 let req = self.agent.delete(url);
120 if self.token.is_empty() {
121 req
122 } else {
123 req.set(DAT_HEADER, &self.token)
124 }
125 }
126
127 pub fn health(&self) -> Result<()> {
131 let agent = ureq::AgentBuilder::new()
132 .timeout_connect(HEALTH_TIMEOUT)
133 .timeout_read(HEALTH_TIMEOUT)
134 .build();
135 let url = format!("{}/healthz", self.endpoint);
136 agent.get(&url).call().map_err(map_error)?;
137 Ok(())
138 }
139
140 pub fn register(&self, payload: &RegisterPayload) -> Result<RegistrationResult> {
143 let url = format!("{}/v1/mdns/announce", self.endpoint);
144 let json_val =
145 serde_json::to_value(payload).map_err(|e| ClientError::Decode(e.to_string()))?;
146 let resp = self
147 .auth_post(&url)
148 .send_json(json_val)
149 .map_err(map_error)?;
150 let json: serde_json::Value = resp
151 .into_json()
152 .map_err(|e| ClientError::Decode(e.to_string()))?;
153 extract(&json, "registered")
154 }
155
156 pub fn unregister(&self, id: &str) -> Result<()> {
157 let url = format!("{}/v1/mdns/unregister/{id}", self.endpoint);
158 self.auth_delete(&url).call().map_err(map_error)?;
159 Ok(())
160 }
161
162 pub fn heartbeat(&self, id: &str) -> Result<RenewalResult> {
163 let url = format!("{}/v1/mdns/heartbeat/{id}", self.endpoint);
164 let resp = self.auth_put(&url).send_bytes(&[]).map_err(map_error)?;
165 let json: serde_json::Value = resp
166 .into_json()
167 .map_err(|e| ClientError::Decode(e.to_string()))?;
168 extract(&json, "renewed")
169 }
170
171 pub fn resolve(&self, instance: &str) -> Result<ServiceRecord> {
172 let url = format!("{}/v1/mdns/resolve", self.endpoint);
173 let resp = self
174 .auth_get(&url)
175 .query("name", instance)
176 .call()
177 .map_err(map_error)?;
178 let json: serde_json::Value = resp
179 .into_json()
180 .map_err(|e| ClientError::Decode(e.to_string()))?;
181 extract(&json, "resolved")
182 }
183
184 pub fn browse_stream(&self, service_type: &str) -> Result<SseStream> {
186 let url = format!("{}/v1/mdns/discover", self.endpoint);
187 let mut req = self.stream_agent().get(&url);
188 if !self.token.is_empty() {
189 req = req.set(DAT_HEADER, &self.token);
190 }
191 let resp = req.query("type", service_type).call().map_err(map_error)?;
192 Ok(SseStream::new(Box::new(resp.into_reader())))
193 }
194
195 pub fn events_stream(&self, service_type: &str) -> Result<SseStream> {
197 let url = format!("{}/v1/mdns/subscribe", self.endpoint);
198 let mut req = self.stream_agent().get(&url);
199 if !self.token.is_empty() {
200 req = req.set(DAT_HEADER, &self.token);
201 }
202 let resp = req.query("type", service_type).call().map_err(map_error)?;
203 Ok(SseStream::new(Box::new(resp.into_reader())))
204 }
205
206 pub fn unified_status(&self) -> Result<serde_json::Value> {
210 let url = format!("{}/v1/status", self.endpoint);
211 let resp = self.auth_get(&url).call().map_err(map_error)?;
212 resp.into_json()
213 .map_err(|e| ClientError::Decode(e.to_string()))
214 }
215
216 pub fn dns_status(&self) -> Result<serde_json::Value> {
219 self.get_json("/v1/dns/status")
220 }
221
222 pub fn dns_lookup(&self, name: &str, record_type: RecordType) -> Result<serde_json::Value> {
223 let url = format!("{}/v1/dns/lookup", self.endpoint);
224 let resp = self
225 .auth_get(&url)
226 .query("name", name)
227 .query("type", record_type_str(record_type))
228 .call()
229 .map_err(map_error)?;
230 resp.into_json()
231 .map_err(|e| ClientError::Decode(e.to_string()))
232 }
233
234 pub fn dns_list(&self) -> Result<serde_json::Value> {
235 self.get_json("/v1/dns/list")
236 }
237
238 pub fn dns_add(&self, name: &str, ip: &str, ttl: Option<u32>) -> Result<serde_json::Value> {
239 let body = serde_json::json!({
240 "name": name,
241 "ip": ip,
242 "ttl": ttl,
243 });
244 self.post_json("/v1/dns/add", &body)
245 }
246
247 pub fn dns_remove(&self, name: &str) -> Result<serde_json::Value> {
248 let url = format!("{}/v1/dns/remove/{}", self.endpoint, name);
249 let resp = self.auth_delete(&url).call().map_err(map_error)?;
250 resp.into_json()
251 .map_err(|e| ClientError::Decode(e.to_string()))
252 }
253
254 pub fn dns_start(&self) -> Result<serde_json::Value> {
255 self.post_json("/v1/dns/serve", &serde_json::json!({}))
256 }
257
258 pub fn dns_stop(&self) -> Result<serde_json::Value> {
259 self.post_json("/v1/dns/stop", &serde_json::json!({}))
260 }
261
262 pub fn health_status(&self) -> Result<serde_json::Value> {
265 self.get_json("/v1/health/status")
266 }
267
268 pub fn health_add_check(
269 &self,
270 name: &str,
271 kind: ServiceCheckKind,
272 target: &str,
273 interval_secs: u64,
274 timeout_secs: u64,
275 ) -> Result<serde_json::Value> {
276 let body = serde_json::json!({
277 "name": name,
278 "kind": check_kind_str(kind),
279 "target": target,
280 "interval_secs": interval_secs,
281 "timeout_secs": timeout_secs,
282 });
283 self.post_json("/v1/health/add", &body)
284 }
285
286 pub fn health_remove_check(&self, name: &str) -> Result<serde_json::Value> {
287 let url = format!("{}/v1/health/remove/{}", self.endpoint, name);
288 let resp = self.auth_delete(&url).call().map_err(map_error)?;
289 resp.into_json()
290 .map_err(|e| ClientError::Decode(e.to_string()))
291 }
292
293 pub fn proxy_status(&self) -> Result<serde_json::Value> {
296 self.get_json("/v1/proxy/status")
297 }
298
299 pub fn proxy_list(&self) -> Result<serde_json::Value> {
300 self.get_json("/v1/proxy/list")
301 }
302
303 pub fn proxy_add(
304 &self,
305 name: &str,
306 listen_port: u16,
307 backend: &str,
308 allow_remote: bool,
309 ) -> Result<serde_json::Value> {
310 let body = serde_json::json!({
311 "name": name,
312 "listen_port": listen_port,
313 "backend": backend,
314 "allow_remote": allow_remote,
315 });
316 self.post_json("/v1/proxy/add", &body)
317 }
318
319 pub fn proxy_remove(&self, name: &str) -> Result<serde_json::Value> {
320 let url = format!("{}/v1/proxy/remove/{}", self.endpoint, name);
321 let resp = self.auth_delete(&url).call().map_err(map_error)?;
322 resp.into_json()
323 .map_err(|e| ClientError::Decode(e.to_string()))
324 }
325
326 pub fn udp_status(&self) -> Result<serde_json::Value> {
329 self.get_json("/v1/udp/status")
330 }
331
332 pub fn udp_bind(&self, port: u16, addr: &str, lease_secs: u64) -> Result<serde_json::Value> {
333 let body = serde_json::json!({
334 "port": port,
335 "addr": addr,
336 "lease_secs": lease_secs,
337 });
338 self.post_json("/v1/udp/bind", &body)
339 }
340
341 pub fn udp_unbind(&self, id: &str) -> Result<serde_json::Value> {
342 let url = format!("{}/v1/udp/bind/{}", self.endpoint, id);
343 let resp = self.auth_delete(&url).call().map_err(map_error)?;
344 resp.into_json()
345 .map_err(|e| ClientError::Decode(e.to_string()))
346 }
347
348 pub fn udp_send(&self, id: &str, dest: &str, payload_b64: &str) -> Result<serde_json::Value> {
349 let body = serde_json::json!({
350 "dest": dest,
351 "payload": payload_b64,
352 });
353 let path = format!("/v1/udp/send/{id}");
354 self.post_json(&path, &body)
355 }
356
357 pub fn udp_heartbeat(&self, id: &str) -> Result<serde_json::Value> {
358 let path = format!("/v1/udp/heartbeat/{id}");
359 self.put_json(&path, &serde_json::json!({}))
360 }
361
362 pub fn post_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
366 let url = format!("{}{path}", self.endpoint);
367 let resp = self
368 .auth_post(&url)
369 .send_json(body.clone())
370 .map_err(map_error)?;
371 resp.into_json()
372 .map_err(|e| ClientError::Decode(e.to_string()))
373 }
374
375 pub fn get_json(&self, path: &str) -> Result<serde_json::Value> {
377 let url = format!("{}{path}", self.endpoint);
378 let resp = self.auth_get(&url).call().map_err(map_error)?;
379 resp.into_json()
380 .map_err(|e| ClientError::Decode(e.to_string()))
381 }
382
383 pub fn put_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
385 let url = format!("{}{path}", self.endpoint);
386 let resp = self
387 .auth_put(&url)
388 .send_json(body.clone())
389 .map_err(map_error)?;
390 resp.into_json()
391 .map_err(|e| ClientError::Decode(e.to_string()))
392 }
393
394 pub fn admin_status(&self) -> Result<DaemonStatus> {
397 let url = format!("{}/v1/mdns/admin/status", self.endpoint);
398 let resp = self.auth_get(&url).call().map_err(map_error)?;
399 resp.into_json()
400 .map_err(|e| ClientError::Decode(e.to_string()))
401 }
402
403 pub fn admin_registrations(&self) -> Result<Vec<AdminRegistration>> {
404 let url = format!("{}/v1/mdns/admin/ls", self.endpoint);
405 let resp = self.auth_get(&url).call().map_err(map_error)?;
406 resp.into_json()
407 .map_err(|e| ClientError::Decode(e.to_string()))
408 }
409
410 pub fn admin_inspect(&self, id: &str) -> Result<AdminRegistration> {
411 let url = format!("{}/v1/mdns/admin/inspect/{id}", self.endpoint);
412 let resp = self.auth_get(&url).call().map_err(map_error)?;
413 resp.into_json()
414 .map_err(|e| ClientError::Decode(e.to_string()))
415 }
416
417 pub fn admin_force_unregister(&self, id: &str) -> Result<()> {
418 let url = format!("{}/v1/mdns/admin/unregister/{id}", self.endpoint);
419 self.auth_delete(&url).call().map_err(map_error)?;
420 Ok(())
421 }
422
423 pub fn admin_drain(&self, id: &str) -> Result<()> {
424 let url = format!("{}/v1/mdns/admin/drain/{id}", self.endpoint);
425 self.auth_post(&url).call().map_err(map_error)?;
426 Ok(())
427 }
428
429 pub fn admin_revive(&self, id: &str) -> Result<()> {
430 let url = format!("{}/v1/mdns/admin/revive/{id}", self.endpoint);
431 self.auth_post(&url).call().map_err(map_error)?;
432 Ok(())
433 }
434
435 pub fn shutdown(&self) -> Result<()> {
439 let url = format!("{}/v1/admin/shutdown", self.endpoint);
440 self.auth_post(&url).call().map_err(map_error)?;
441 Ok(())
442 }
443
444 pub fn get_roster_manifest(&self) -> Result<serde_json::Value> {
448 let url = format!("{}/v1/certmesh/roster", self.endpoint);
449 let resp = self.auth_get(&url).call().map_err(map_error)?;
450 resp.into_json()
451 .map_err(|e| ClientError::Decode(e.to_string()))
452 }
453
454 #[allow(dead_code)]
459 pub fn push_renewal(
460 &self,
461 member_endpoint: &str,
462 request: &serde_json::Value,
463 ) -> Result<serde_json::Value> {
464 let url = format!("{member_endpoint}/v1/certmesh/renew");
465 let resp = self
466 .auth_post(&url)
467 .send_json(request.clone())
468 .map_err(map_error)?;
469 resp.into_json()
470 .map_err(|e| ClientError::Decode(e.to_string()))
471 }
472
473 pub fn health_heartbeat(&self, request: &serde_json::Value) -> Result<serde_json::Value> {
475 let url = format!("{}/v1/certmesh/health", self.endpoint);
476 let resp = self
477 .auth_post(&url)
478 .send_json(request.clone())
479 .map_err(map_error)?;
480 resp.into_json()
481 .map_err(|e| ClientError::Decode(e.to_string()))
482 }
483
484 fn stream_agent(&self) -> ureq::Agent {
488 ureq::AgentBuilder::new()
489 .timeout_connect(CONNECT_TIMEOUT)
490 .build()
491 }
492}
493
494pub struct SseStream {
500 reader: BufReader<Box<dyn Read + Send>>,
501}
502
503impl SseStream {
504 fn new(reader: Box<dyn Read + Send>) -> Self {
505 Self {
506 reader: BufReader::new(reader),
507 }
508 }
509}
510
511impl Iterator for SseStream {
512 type Item = Result<serde_json::Value>;
513
514 fn next(&mut self) -> Option<Self::Item> {
515 loop {
516 let mut line = String::new();
517 match self.reader.read_line(&mut line) {
518 Ok(0) => return None,
519 Ok(_) => {
520 let trimmed = line.trim();
521 if let Some(data) = trimmed.strip_prefix("data:") {
522 let data = data.trim_start();
523 if data.is_empty() {
524 continue;
525 }
526 match serde_json::from_str(data) {
527 Ok(json) => return Some(Ok(json)),
528 Err(e) => return Some(Err(ClientError::Decode(e.to_string()))),
529 }
530 }
531 continue;
532 }
533 Err(e) => return Some(Err(ClientError::Transport(e.to_string()))),
534 }
535 }
536 }
537}
538
539fn map_error(e: ureq::Error) -> ClientError {
542 match e {
543 ureq::Error::Status(_status, resp) => {
544 let body = resp.into_string().unwrap_or_default();
545 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
546 let error = json
547 .get("error")
548 .and_then(|v| v.as_str())
549 .unwrap_or("unknown")
550 .to_string();
551 let message = json
552 .get("message")
553 .and_then(|v| v.as_str())
554 .unwrap_or(&body)
555 .to_string();
556 ClientError::Api { error, message }
557 } else {
558 ClientError::Api {
559 error: "http_error".into(),
560 message: body,
561 }
562 }
563 }
564 ureq::Error::Transport(t) => ClientError::Unreachable(t.to_string()),
565 }
566}
567
568fn record_type_str(record_type: RecordType) -> &'static str {
569 match record_type {
570 RecordType::A => "A",
571 RecordType::AAAA => "AAAA",
572 RecordType::ANY => "ANY",
573 _ => "A",
574 }
575}
576
577fn check_kind_str(kind: ServiceCheckKind) -> &'static str {
578 match kind {
579 ServiceCheckKind::Http => "http",
580 ServiceCheckKind::Tcp => "tcp",
581 }
582}
583
584fn extract<T: serde::de::DeserializeOwned>(json: &serde_json::Value, key: &str) -> Result<T> {
585 if let Some(err_val) = json.get("error") {
586 let error = err_val.as_str().unwrap_or("unknown").to_string();
587 let message = json
588 .get("message")
589 .and_then(|m| m.as_str())
590 .unwrap_or("Unknown error")
591 .to_string();
592 return Err(ClientError::Api { error, message });
593 }
594 json.get(key)
595 .ok_or_else(|| ClientError::Decode(format!("Missing '{key}' in response")))
596 .and_then(|v| {
597 serde_json::from_value(v.clone()).map_err(|e| ClientError::Decode(e.to_string()))
598 })
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 fn cursor_stream(input: &str) -> SseStream {
608 let cursor = std::io::Cursor::new(input.as_bytes().to_vec());
609 SseStream::new(Box::new(cursor))
610 }
611
612 #[test]
615 fn client_new_strips_trailing_slash() {
616 let client = KoiClient::new("http://localhost:5641/");
618 assert!(
619 client.endpoint == "http://127.0.0.1:5641"
620 || client.endpoint == "http://[::1]:5641"
621 || client.endpoint == "http://localhost:5641",
622 "unexpected endpoint: {}",
623 client.endpoint
624 );
625 assert!(!client.endpoint.ends_with("/"));
626 assert!(client.token.is_empty());
627 }
628
629 #[test]
630 fn client_with_token_sets_token() {
631 let client = KoiClient::with_token("http://10.0.0.1:5641", "my-secret-token");
632 assert_eq!(client.endpoint, "http://10.0.0.1:5641");
633 assert_eq!(client.token, "my-secret-token");
634 }
635
636 #[test]
637 fn client_new_preserves_non_localhost() {
638 let client = KoiClient::new("http://10.0.0.1:5641");
639 assert_eq!(client.endpoint, "http://10.0.0.1:5641");
640 }
641
642 #[test]
643 fn client_new_strips_multiple_trailing_slashes() {
644 let client = KoiClient::new("http://localhost:5641///");
645 assert!(!client.endpoint.ends_with("/"));
646 }
647
648 #[test]
651 fn sse_stream_yields_parsed_json() {
652 let input = "data: {\"foo\": 1}\n\n";
653 let mut stream = cursor_stream(input);
654 let item = stream.next().unwrap().unwrap();
655 assert_eq!(item["foo"], 1);
656 }
657
658 #[test]
659 fn sse_stream_skips_empty_lines() {
660 let input = "\n\n\n\n";
661 let mut stream = cursor_stream(input);
662 assert!(stream.next().is_none());
663 }
664
665 #[test]
666 fn sse_stream_skips_non_data_lines() {
667 let input = "event: message\nretry: 1000\n\n";
668 let mut stream = cursor_stream(input);
669 assert!(stream.next().is_none());
670 }
671
672 #[test]
673 fn sse_stream_handles_leading_space() {
674 let input = "data: {\"hello\": \"world\"}\n";
675 let mut stream = cursor_stream(input);
676 let item = stream.next().unwrap().unwrap();
677 assert_eq!(item["hello"], "world");
678 }
679
680 #[test]
681 fn sse_stream_handles_no_space() {
682 let input = "data:{\"hello\":\"world\"}\n";
683 let mut stream = cursor_stream(input);
684 let item = stream.next().unwrap().unwrap();
685 assert_eq!(item["hello"], "world");
686 }
687
688 #[test]
689 fn sse_stream_yields_multiple_events() {
690 let input = "data: {\"n\": 1}\n\ndata: {\"n\": 2}\n\n";
691 let mut stream = cursor_stream(input);
692 let first = stream.next().unwrap().unwrap();
693 let second = stream.next().unwrap().unwrap();
694 assert_eq!(first["n"], 1);
695 assert_eq!(second["n"], 2);
696 }
697
698 #[test]
699 fn sse_stream_returns_none_on_eof() {
700 let input = "data: {\"n\": 1}\n";
701 let mut stream = cursor_stream(input);
702 let _ = stream.next();
703 assert!(stream.next().is_none());
704 }
705
706 #[test]
707 fn sse_stream_decode_error_on_invalid_json() {
708 let input = "data: {bad json}\n";
709 let mut stream = cursor_stream(input);
710 let item = stream.next().unwrap();
711 assert!(item.is_err());
712 }
713
714 #[test]
715 fn sse_stream_transport_error_on_read_failure() {
716 struct BrokenReader;
717 impl Read for BrokenReader {
718 fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
719 Err(std::io::Error::other("boom"))
720 }
721 }
722
723 let stream = SseStream::new(Box::new(BrokenReader));
724 let mut stream = stream;
725 let item = stream.next().unwrap();
726 assert!(item.is_err());
727 }
728}