use crate::cli::{print_value, DaemonClient, OutputFormat};
use anyhow::Result;
use base64::Engine;
pub async fn publish(client: &DaemonClient, topic: &str, payload: &str) -> Result<()> {
client.ensure_running().await?;
let encoded = base64::engine::general_purpose::STANDARD.encode(payload.as_bytes());
let body = serde_json::json!({
"topic": topic,
"payload": encoded,
});
let resp = client.post("/publish", &body).await?;
print_value(client.format(), &resp);
Ok(())
}
pub async fn subscribe(client: &DaemonClient, topic: &str) -> Result<()> {
client.ensure_running().await?;
let body = serde_json::json!({ "topic": topic });
let sub_resp = client.post("/subscribe", &body).await?;
let sub_id = sub_resp
.get("subscription_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
eprintln!("Subscribed to '{topic}' (id: {sub_id}). Streaming events... (Ctrl+C to stop)");
stream_sse(client, "/events").await?;
let _ = client.delete(&format!("/subscribe/{sub_id}")).await;
Ok(())
}
pub async fn unsubscribe(client: &DaemonClient, id: &str) -> Result<()> {
client.ensure_running().await?;
let resp = client.delete(&format!("/subscribe/{id}")).await?;
print_value(client.format(), &resp);
Ok(())
}
pub async fn events(client: &DaemonClient) -> Result<()> {
client.ensure_running().await?;
eprintln!("Streaming events... (Ctrl+C to stop)");
stream_sse(client, "/events").await
}
async fn stream_sse(client: &DaemonClient, path: &str) -> Result<()> {
use futures::StreamExt;
let resp = client.get_stream(path).await?;
let mut stream = resp.bytes_stream();
let mut buffer = String::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find("\n\n") {
let frame = buffer[..pos].to_string();
buffer = buffer[pos + 2..].to_string();
for line in frame.lines() {
if let Some(data) = line.strip_prefix("data: ") {
match client.format() {
OutputFormat::Json => println!("{data}"),
OutputFormat::Text => {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
print_value(OutputFormat::Text, &val);
} else {
println!("{data}");
}
}
}
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::cli::DaemonClient;
#[allow(dead_code)]
async fn start_mock_server(
response_json: serde_json::Value,
) -> (String, tokio::sync::oneshot::Sender<()>) {
use std::sync::Arc;
let json = Arc::new(response_json);
let app = axum::Router::new().fallback(move |_req: axum::extract::Request| {
let json = Arc::clone(&json);
async move {
let body = serde_json::to_vec(&*json).unwrap();
axum::response::Response::builder()
.status(200)
.header("content-type", "application/json")
.body(axum::body::Body::from(body))
.unwrap()
}
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.ok();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
(format!("http://{}", addr), tx)
}
#[tokio::test]
async fn subscribe_returns_mock_response() {
let mock_resp = serde_json::json!({"topics": ["test-topic"]});
let (url, _shutdown) = start_mock_server(mock_resp).await;
let client = DaemonClient::new(None, Some(&url), crate::cli::OutputFormat::Json).unwrap();
let result = subscribe(&client, "test-topic").await;
assert!(result.is_ok(), "subscribe should succeed: {:?}", result);
}
}