use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::subject;
use kanade_shared::wire::LogsRequest;
use tracing::{info, warn};
const REPLY_BYTE_CAP: usize = 800 * 1024;
pub async fn serve(client: async_nats::Client, pc_id: String, log_path: PathBuf) {
let subject = subject::logs_fetch(&pc_id);
let mut sub = match client.subscribe(subject.clone()).await {
Ok(s) => s,
Err(e) => {
warn!(error = %e, subject = %subject, "subscribe logs.fetch failed");
return;
}
};
info!(subject = %subject, log_path = %log_path.display(), "logs.fetch handler ready");
while let Some(msg) = sub.next().await {
let Some(reply) = msg.reply.clone() else {
warn!("logs.fetch without reply subject — operator must use request/reply");
continue;
};
let req: LogsRequest = serde_json::from_slice(&msg.payload).unwrap_or_default();
let body = match read_tail(&log_path, req.tail_lines as usize).await {
Ok(b) => b,
Err(e) => {
let err_msg = format!("logs.fetch: {e:#}");
warn!(error = %e, "logs.fetch handler errored");
err_msg.into_bytes()
}
};
if let Err(e) = client.publish(reply, body.into()).await {
warn!(error = %e, "publish logs.fetch reply");
}
}
}
async fn read_tail(path: &Path, n: usize) -> Result<Vec<u8>> {
let active = locate_active_file(path).await?;
let bytes = tokio::fs::read(&active)
.await
.with_context(|| format!("read {active:?}"))?;
let text = String::from_utf8_lossy(&bytes);
let lines: Vec<&str> = text.lines().collect();
let start = lines.len().saturating_sub(n);
let mut out = lines[start..].join("\n");
if !out.is_empty() {
out.push('\n');
}
let bytes = out.into_bytes();
if bytes.len() <= REPLY_BYTE_CAP {
return Ok(bytes);
}
let cut = bytes.len() - REPLY_BYTE_CAP;
let mut start = cut;
while start < bytes.len() && (bytes[start] & 0b1100_0000) == 0b1000_0000 {
start += 1;
}
Ok(bytes[start..].to_vec())
}
async fn locate_active_file(template: &Path) -> Result<PathBuf> {
let dir = template
.parent()
.with_context(|| format!("log template '{}' has no parent dir", template.display()))?;
let stem = template
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("agent");
let ext = template
.extension()
.and_then(|s| s.to_str())
.unwrap_or("log");
let prefix = format!("{stem}.");
let suffix = format!(".{ext}");
let mut rd = tokio::fs::read_dir(dir)
.await
.with_context(|| format!("read_dir {dir:?}"))?;
let mut best: Option<PathBuf> = None;
while let Some(entry) = rd.next_entry().await? {
let name = entry.file_name();
let Some(name) = name.to_str() else { continue };
if !name.starts_with(&prefix) || !name.ends_with(&suffix) {
continue;
}
let p = entry.path();
match &best {
Some(curr) if name <= curr.file_name().unwrap().to_str().unwrap() => {}
_ => best = Some(p),
}
}
Ok(best.unwrap_or_else(|| template.to_path_buf()))
}