1use std::io::{BufRead, BufReader, Read};
7use std::time::Duration;
8
9use hickory_proto::rr::RecordType;
10use koi_common::mdns_protocol::{
11 AdminRegistration, DaemonStatus, RegisterPayload, RegistrationResult, RenewalResult,
12};
13use koi_common::net::resolve_localhost;
14use koi_common::types::{ServiceCheckKind, ServiceRecord};
15
16const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
18
19const READ_TIMEOUT: Duration = Duration::from_secs(10);
21
22const HEALTH_TIMEOUT: Duration = Duration::from_millis(200);
24
25#[derive(Debug, thiserror::Error)]
28pub enum ClientError {
29 #[error("Daemon not reachable: {0}")]
30 Unreachable(String),
31
32 #[error("{error}: {message}")]
33 Api { error: String, message: String },
34
35 #[error("Request failed: {0}")]
36 Transport(String),
37
38 #[error("Invalid response: {0}")]
39 Decode(String),
40}
41
42pub type Result<T> = std::result::Result<T, ClientError>;
43
44const DAT_HEADER: &str = "X-Koi-Token";
48
49pub struct KoiClient {
50 endpoint: String,
51 agent: ureq::Agent,
52 token: String,
54}
55
56impl KoiClient {
57 pub fn new(endpoint: &str) -> Self {
58 let clean = endpoint.trim_end_matches('/');
59 let resolved = resolve_localhost(clean);
60 let agent = ureq::AgentBuilder::new()
61 .timeout_connect(CONNECT_TIMEOUT)
62 .timeout_read(READ_TIMEOUT)
63 .build();
64 Self {
65 endpoint: resolved,
66 agent,
67 token: String::new(),
68 }
69 }
70
71 pub fn with_token(endpoint: &str, token: &str) -> Self {
73 let mut client = Self::new(endpoint);
74 client.token = token.to_string();
75 client
76 }
77
78 pub fn from_breadcrumb() -> Option<Self> {
82 let bc = koi_config::breadcrumb::read_breadcrumb()?;
83 Some(Self::with_token(&bc.endpoint, &bc.token))
84 }
85
86 fn auth_get(&self, url: &str) -> ureq::Request {
88 let req = self.agent.get(url);
89 if self.token.is_empty() {
90 req
91 } else {
92 req.set(DAT_HEADER, &self.token)
93 }
94 }
95
96 fn auth_post(&self, url: &str) -> ureq::Request {
98 let req = self.agent.post(url);
99 if self.token.is_empty() {
100 req
101 } else {
102 req.set(DAT_HEADER, &self.token)
103 }
104 }
105
106 fn auth_put(&self, url: &str) -> ureq::Request {
108 let req = self.agent.put(url);
109 if self.token.is_empty() {
110 req
111 } else {
112 req.set(DAT_HEADER, &self.token)
113 }
114 }
115
116 fn auth_delete(&self, url: &str) -> ureq::Request {
118 let req = self.agent.delete(url);
119 if self.token.is_empty() {
120 req
121 } else {
122 req.set(DAT_HEADER, &self.token)
123 }
124 }
125
126 pub fn health(&self) -> Result<()> {
130 let agent = ureq::AgentBuilder::new()
131 .timeout_connect(HEALTH_TIMEOUT)
132 .timeout_read(HEALTH_TIMEOUT)
133 .build();
134 let url = format!("{}/healthz", self.endpoint);
135 agent.get(&url).call().map_err(map_error)?;
136 Ok(())
137 }
138
139 pub fn register(&self, payload: &RegisterPayload) -> Result<RegistrationResult> {
142 let url = format!("{}/v1/mdns/announce", self.endpoint);
143 let json_val =
144 serde_json::to_value(payload).map_err(|e| ClientError::Decode(e.to_string()))?;
145 let resp = self
146 .auth_post(&url)
147 .send_json(json_val)
148 .map_err(map_error)?;
149 let json: serde_json::Value = resp
150 .into_json()
151 .map_err(|e| ClientError::Decode(e.to_string()))?;
152 extract(&json, "registered")
153 }
154
155 pub fn unregister(&self, id: &str) -> Result<()> {
156 let url = format!("{}/v1/mdns/unregister/{id}", self.endpoint);
157 self.auth_delete(&url).call().map_err(map_error)?;
158 Ok(())
159 }
160
161 pub fn heartbeat(&self, id: &str) -> Result<RenewalResult> {
162 let url = format!("{}/v1/mdns/heartbeat/{id}", self.endpoint);
163 let resp = self.auth_put(&url).send_bytes(&[]).map_err(map_error)?;
164 let json: serde_json::Value = resp
165 .into_json()
166 .map_err(|e| ClientError::Decode(e.to_string()))?;
167 extract(&json, "renewed")
168 }
169
170 pub fn resolve(&self, instance: &str) -> Result<ServiceRecord> {
171 let url = format!("{}/v1/mdns/resolve", self.endpoint);
172 let resp = self
173 .auth_get(&url)
174 .query("name", instance)
175 .call()
176 .map_err(map_error)?;
177 let json: serde_json::Value = resp
178 .into_json()
179 .map_err(|e| ClientError::Decode(e.to_string()))?;
180 extract(&json, "resolved")
181 }
182
183 pub fn browse_stream(&self, service_type: &str) -> Result<SseStream> {
185 let url = format!("{}/v1/mdns/discover", self.endpoint);
186 let mut req = self.stream_agent().get(&url);
187 if !self.token.is_empty() {
188 req = req.set(DAT_HEADER, &self.token);
189 }
190 let resp = req.query("type", service_type).call().map_err(map_error)?;
191 Ok(SseStream::new(Box::new(resp.into_reader())))
192 }
193
194 pub fn events_stream(&self, service_type: &str) -> Result<SseStream> {
196 let url = format!("{}/v1/mdns/subscribe", self.endpoint);
197 let mut req = self.stream_agent().get(&url);
198 if !self.token.is_empty() {
199 req = req.set(DAT_HEADER, &self.token);
200 }
201 let resp = req.query("type", service_type).call().map_err(map_error)?;
202 Ok(SseStream::new(Box::new(resp.into_reader())))
203 }
204
205 pub fn unified_status(&self) -> Result<serde_json::Value> {
209 let url = format!("{}/v1/status", self.endpoint);
210 let resp = self.auth_get(&url).call().map_err(map_error)?;
211 resp.into_json()
212 .map_err(|e| ClientError::Decode(e.to_string()))
213 }
214
215 pub fn dns_status(&self) -> Result<serde_json::Value> {
218 self.get_json("/v1/dns/status")
219 }
220
221 pub fn dns_lookup(&self, name: &str, record_type: RecordType) -> Result<serde_json::Value> {
222 let url = format!("{}/v1/dns/lookup", self.endpoint);
223 let resp = self
224 .auth_get(&url)
225 .query("name", name)
226 .query("type", record_type_str(record_type))
227 .call()
228 .map_err(map_error)?;
229 resp.into_json()
230 .map_err(|e| ClientError::Decode(e.to_string()))
231 }
232
233 pub fn dns_list(&self) -> Result<serde_json::Value> {
234 self.get_json("/v1/dns/list")
235 }
236
237 pub fn dns_add(&self, name: &str, ip: &str, ttl: Option<u32>) -> Result<serde_json::Value> {
238 let body = serde_json::json!({
239 "name": name,
240 "ip": ip,
241 "ttl": ttl,
242 });
243 self.post_json("/v1/dns/add", &body)
244 }
245
246 pub fn dns_remove(&self, name: &str) -> Result<serde_json::Value> {
247 let url = format!("{}/v1/dns/remove/{}", self.endpoint, name);
248 let resp = self.auth_delete(&url).call().map_err(map_error)?;
249 resp.into_json()
250 .map_err(|e| ClientError::Decode(e.to_string()))
251 }
252
253 pub fn dns_start(&self) -> Result<serde_json::Value> {
254 self.post_json("/v1/dns/serve", &serde_json::json!({}))
255 }
256
257 pub fn dns_stop(&self) -> Result<serde_json::Value> {
258 self.post_json("/v1/dns/stop", &serde_json::json!({}))
259 }
260
261 pub fn health_status(&self) -> Result<serde_json::Value> {
264 self.get_json("/v1/health/status")
265 }
266
267 pub fn health_add_check(
268 &self,
269 name: &str,
270 kind: ServiceCheckKind,
271 target: &str,
272 interval_secs: u64,
273 timeout_secs: u64,
274 ) -> Result<serde_json::Value> {
275 let body = serde_json::json!({
276 "name": name,
277 "kind": check_kind_str(kind),
278 "target": target,
279 "interval_secs": interval_secs,
280 "timeout_secs": timeout_secs,
281 });
282 self.post_json("/v1/health/add", &body)
283 }
284
285 pub fn health_remove_check(&self, name: &str) -> Result<serde_json::Value> {
286 let url = format!("{}/v1/health/remove/{}", self.endpoint, name);
287 let resp = self.auth_delete(&url).call().map_err(map_error)?;
288 resp.into_json()
289 .map_err(|e| ClientError::Decode(e.to_string()))
290 }
291
292 pub fn proxy_status(&self) -> Result<serde_json::Value> {
295 self.get_json("/v1/proxy/status")
296 }
297
298 pub fn proxy_list(&self) -> Result<serde_json::Value> {
299 self.get_json("/v1/proxy/list")
300 }
301
302 pub fn proxy_add(
303 &self,
304 name: &str,
305 listen_port: u16,
306 backend: &str,
307 allow_remote: bool,
308 ) -> Result<serde_json::Value> {
309 let body = serde_json::json!({
310 "name": name,
311 "listen_port": listen_port,
312 "backend": backend,
313 "allow_remote": allow_remote,
314 });
315 self.post_json("/v1/proxy/add", &body)
316 }
317
318 pub fn proxy_remove(&self, name: &str) -> Result<serde_json::Value> {
319 let url = format!("{}/v1/proxy/remove/{}", self.endpoint, name);
320 let resp = self.auth_delete(&url).call().map_err(map_error)?;
321 resp.into_json()
322 .map_err(|e| ClientError::Decode(e.to_string()))
323 }
324
325 pub fn udp_status(&self) -> Result<serde_json::Value> {
328 self.get_json("/v1/udp/status")
329 }
330
331 pub fn udp_bind(&self, port: u16, addr: &str, lease_secs: u64) -> Result<serde_json::Value> {
332 let body = serde_json::json!({
333 "port": port,
334 "addr": addr,
335 "lease_secs": lease_secs,
336 });
337 self.post_json("/v1/udp/bind", &body)
338 }
339
340 pub fn udp_unbind(&self, id: &str) -> Result<serde_json::Value> {
341 let url = format!("{}/v1/udp/bind/{}", self.endpoint, id);
342 let resp = self.auth_delete(&url).call().map_err(map_error)?;
343 resp.into_json()
344 .map_err(|e| ClientError::Decode(e.to_string()))
345 }
346
347 pub fn udp_send(&self, id: &str, dest: &str, payload_b64: &str) -> Result<serde_json::Value> {
348 let body = serde_json::json!({
349 "dest": dest,
350 "payload": payload_b64,
351 });
352 let path = format!("/v1/udp/send/{id}");
353 self.post_json(&path, &body)
354 }
355
356 pub fn udp_heartbeat(&self, id: &str) -> Result<serde_json::Value> {
357 let path = format!("/v1/udp/heartbeat/{id}");
358 self.put_json(&path, &serde_json::json!({}))
359 }
360
361 pub fn post_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
365 let url = format!("{}{path}", self.endpoint);
366 let resp = self
367 .auth_post(&url)
368 .send_json(body.clone())
369 .map_err(map_error)?;
370 resp.into_json()
371 .map_err(|e| ClientError::Decode(e.to_string()))
372 }
373
374 pub fn get_json(&self, path: &str) -> Result<serde_json::Value> {
376 let url = format!("{}{path}", self.endpoint);
377 let resp = self.auth_get(&url).call().map_err(map_error)?;
378 resp.into_json()
379 .map_err(|e| ClientError::Decode(e.to_string()))
380 }
381
382 pub fn put_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
384 let url = format!("{}{path}", self.endpoint);
385 let resp = self
386 .auth_put(&url)
387 .send_json(body.clone())
388 .map_err(map_error)?;
389 resp.into_json()
390 .map_err(|e| ClientError::Decode(e.to_string()))
391 }
392
393 pub fn admin_status(&self) -> Result<DaemonStatus> {
396 let url = format!("{}/v1/mdns/admin/status", self.endpoint);
397 let resp = self.auth_get(&url).call().map_err(map_error)?;
398 resp.into_json()
399 .map_err(|e| ClientError::Decode(e.to_string()))
400 }
401
402 pub fn admin_registrations(&self) -> Result<Vec<AdminRegistration>> {
403 let url = format!("{}/v1/mdns/admin/ls", self.endpoint);
404 let resp = self.auth_get(&url).call().map_err(map_error)?;
405 resp.into_json()
406 .map_err(|e| ClientError::Decode(e.to_string()))
407 }
408
409 pub fn admin_inspect(&self, id: &str) -> Result<AdminRegistration> {
410 let url = format!("{}/v1/mdns/admin/inspect/{id}", self.endpoint);
411 let resp = self.auth_get(&url).call().map_err(map_error)?;
412 resp.into_json()
413 .map_err(|e| ClientError::Decode(e.to_string()))
414 }
415
416 pub fn admin_force_unregister(&self, id: &str) -> Result<()> {
417 let url = format!("{}/v1/mdns/admin/unregister/{id}", self.endpoint);
418 self.auth_delete(&url).call().map_err(map_error)?;
419 Ok(())
420 }
421
422 pub fn admin_drain(&self, id: &str) -> Result<()> {
423 let url = format!("{}/v1/mdns/admin/drain/{id}", self.endpoint);
424 self.auth_post(&url).call().map_err(map_error)?;
425 Ok(())
426 }
427
428 pub fn admin_revive(&self, id: &str) -> Result<()> {
429 let url = format!("{}/v1/mdns/admin/revive/{id}", self.endpoint);
430 self.auth_post(&url).call().map_err(map_error)?;
431 Ok(())
432 }
433
434 pub fn shutdown(&self) -> Result<()> {
438 let url = format!("{}/v1/admin/shutdown", self.endpoint);
439 self.auth_post(&url).call().map_err(map_error)?;
440 Ok(())
441 }
442
443 pub fn get_roster_manifest(&self) -> Result<serde_json::Value> {
447 let url = format!("{}/v1/certmesh/roster", self.endpoint);
448 let resp = self.auth_get(&url).call().map_err(map_error)?;
449 resp.into_json()
450 .map_err(|e| ClientError::Decode(e.to_string()))
451 }
452
453 #[allow(dead_code)]
458 pub fn push_renewal(
459 &self,
460 member_endpoint: &str,
461 request: &serde_json::Value,
462 ) -> Result<serde_json::Value> {
463 let url = format!("{member_endpoint}/v1/certmesh/renew");
464 let resp = self
465 .auth_post(&url)
466 .send_json(request.clone())
467 .map_err(map_error)?;
468 resp.into_json()
469 .map_err(|e| ClientError::Decode(e.to_string()))
470 }
471
472 pub fn health_heartbeat(&self, request: &serde_json::Value) -> Result<serde_json::Value> {
474 let url = format!("{}/v1/certmesh/health", self.endpoint);
475 let resp = self
476 .auth_post(&url)
477 .send_json(request.clone())
478 .map_err(map_error)?;
479 resp.into_json()
480 .map_err(|e| ClientError::Decode(e.to_string()))
481 }
482
483 fn stream_agent(&self) -> ureq::Agent {
487 ureq::AgentBuilder::new()
488 .timeout_connect(CONNECT_TIMEOUT)
489 .build()
490 }
491}
492
493pub struct SseStream {
499 reader: BufReader<Box<dyn Read + Send>>,
500}
501
502impl SseStream {
503 fn new(reader: Box<dyn Read + Send>) -> Self {
504 Self {
505 reader: BufReader::new(reader),
506 }
507 }
508}
509
510impl Iterator for SseStream {
511 type Item = Result<serde_json::Value>;
512
513 fn next(&mut self) -> Option<Self::Item> {
514 loop {
515 let mut line = String::new();
516 match self.reader.read_line(&mut line) {
517 Ok(0) => return None,
518 Ok(_) => {
519 let trimmed = line.trim();
520 if let Some(data) = trimmed.strip_prefix("data:") {
521 let data = data.trim_start();
522 if data.is_empty() {
523 continue;
524 }
525 match serde_json::from_str(data) {
526 Ok(json) => return Some(Ok(json)),
527 Err(e) => return Some(Err(ClientError::Decode(e.to_string()))),
528 }
529 }
530 continue;
531 }
532 Err(e) => return Some(Err(ClientError::Transport(e.to_string()))),
533 }
534 }
535 }
536}
537
538fn map_error(e: ureq::Error) -> ClientError {
541 match e {
542 ureq::Error::Status(_status, resp) => {
543 let body = resp.into_string().unwrap_or_default();
544 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
545 let error = json
546 .get("error")
547 .and_then(|v| v.as_str())
548 .unwrap_or("unknown")
549 .to_string();
550 let message = json
551 .get("message")
552 .and_then(|v| v.as_str())
553 .unwrap_or(&body)
554 .to_string();
555 ClientError::Api { error, message }
556 } else {
557 ClientError::Api {
558 error: "http_error".into(),
559 message: body,
560 }
561 }
562 }
563 ureq::Error::Transport(t) => ClientError::Unreachable(t.to_string()),
564 }
565}
566
567fn record_type_str(record_type: RecordType) -> &'static str {
568 match record_type {
569 RecordType::A => "A",
570 RecordType::AAAA => "AAAA",
571 RecordType::ANY => "ANY",
572 _ => "A",
573 }
574}
575
576fn check_kind_str(kind: ServiceCheckKind) -> &'static str {
577 match kind {
578 ServiceCheckKind::Http => "http",
579 ServiceCheckKind::Tcp => "tcp",
580 }
581}
582
583fn extract<T: serde::de::DeserializeOwned>(json: &serde_json::Value, key: &str) -> Result<T> {
584 if let Some(err_val) = json.get("error") {
585 let error = err_val.as_str().unwrap_or("unknown").to_string();
586 let message = json
587 .get("message")
588 .and_then(|m| m.as_str())
589 .unwrap_or("Unknown error")
590 .to_string();
591 return Err(ClientError::Api { error, message });
592 }
593 json.get(key)
594 .ok_or_else(|| ClientError::Decode(format!("Missing '{key}' in response")))
595 .and_then(|v| {
596 serde_json::from_value(v.clone()).map_err(|e| ClientError::Decode(e.to_string()))
597 })
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603
604 fn cursor_stream(input: &str) -> SseStream {
607 let cursor = std::io::Cursor::new(input.as_bytes().to_vec());
608 SseStream::new(Box::new(cursor))
609 }
610
611 #[test]
614 fn client_new_strips_trailing_slash() {
615 let client = KoiClient::new("http://localhost:5641/");
617 assert!(
618 client.endpoint == "http://127.0.0.1:5641"
619 || client.endpoint == "http://[::1]:5641"
620 || client.endpoint == "http://localhost:5641",
621 "unexpected endpoint: {}",
622 client.endpoint
623 );
624 assert!(!client.endpoint.ends_with("/"));
625 assert!(client.token.is_empty());
626 }
627
628 #[test]
629 fn client_with_token_sets_token() {
630 let client = KoiClient::with_token("http://10.0.0.1:5641", "my-secret-token");
631 assert_eq!(client.endpoint, "http://10.0.0.1:5641");
632 assert_eq!(client.token, "my-secret-token");
633 }
634
635 #[test]
636 fn client_new_preserves_non_localhost() {
637 let client = KoiClient::new("http://10.0.0.1:5641");
638 assert_eq!(client.endpoint, "http://10.0.0.1:5641");
639 }
640
641 #[test]
642 fn client_new_strips_multiple_trailing_slashes() {
643 let client = KoiClient::new("http://localhost:5641///");
644 assert!(!client.endpoint.ends_with("/"));
645 }
646
647 #[test]
650 fn sse_stream_yields_parsed_json() {
651 let input = "data: {\"foo\": 1}\n\n";
652 let mut stream = cursor_stream(input);
653 let item = stream.next().unwrap().unwrap();
654 assert_eq!(item["foo"], 1);
655 }
656
657 #[test]
658 fn sse_stream_skips_empty_lines() {
659 let input = "\n\n\n\n";
660 let mut stream = cursor_stream(input);
661 assert!(stream.next().is_none());
662 }
663
664 #[test]
665 fn sse_stream_skips_non_data_lines() {
666 let input = "event: message\nretry: 1000\n\n";
667 let mut stream = cursor_stream(input);
668 assert!(stream.next().is_none());
669 }
670
671 #[test]
672 fn sse_stream_handles_leading_space() {
673 let input = "data: {\"hello\": \"world\"}\n";
674 let mut stream = cursor_stream(input);
675 let item = stream.next().unwrap().unwrap();
676 assert_eq!(item["hello"], "world");
677 }
678
679 #[test]
680 fn sse_stream_handles_no_space() {
681 let input = "data:{\"hello\":\"world\"}\n";
682 let mut stream = cursor_stream(input);
683 let item = stream.next().unwrap().unwrap();
684 assert_eq!(item["hello"], "world");
685 }
686
687 #[test]
688 fn sse_stream_yields_multiple_events() {
689 let input = "data: {\"n\": 1}\n\ndata: {\"n\": 2}\n\n";
690 let mut stream = cursor_stream(input);
691 let first = stream.next().unwrap().unwrap();
692 let second = stream.next().unwrap().unwrap();
693 assert_eq!(first["n"], 1);
694 assert_eq!(second["n"], 2);
695 }
696
697 #[test]
698 fn sse_stream_returns_none_on_eof() {
699 let input = "data: {\"n\": 1}\n";
700 let mut stream = cursor_stream(input);
701 let _ = stream.next();
702 assert!(stream.next().is_none());
703 }
704
705 #[test]
706 fn sse_stream_decode_error_on_invalid_json() {
707 let input = "data: {bad json}\n";
708 let mut stream = cursor_stream(input);
709 let item = stream.next().unwrap();
710 assert!(item.is_err());
711 }
712
713 #[test]
714 fn sse_stream_transport_error_on_read_failure() {
715 struct BrokenReader;
716 impl Read for BrokenReader {
717 fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
718 Err(std::io::Error::other("boom"))
719 }
720 }
721
722 let stream = SseStream::new(Box::new(BrokenReader));
723 let mut stream = stream;
724 let item = stream.next().unwrap();
725 assert!(item.is_err());
726 }
727}