use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use crate::bee_log::parse_line;
use crate::components::log_pane::{BeeLogLine, LogTab};
pub const POLL_INTERVAL: Duration = Duration::from_millis(200);
const OPEN_RETRY_INTERVAL: Duration = Duration::from_millis(100);
const OPEN_RETRY_LIMIT: u32 = 50;
pub fn spawn(
log_path: PathBuf,
tx: UnboundedSender<(LogTab, BeeLogLine)>,
cancel: CancellationToken,
) {
tokio::spawn(async move {
run(log_path, tx, cancel).await;
});
}
async fn run(
log_path: PathBuf,
tx: UnboundedSender<(LogTab, BeeLogLine)>,
cancel: CancellationToken,
) {
let mut file = match open_with_retry(&log_path, &cancel).await {
Some(f) => f,
None => {
tracing::warn!(
"bee-log tailer: gave up opening {log_path:?} after retries; \
the bee-side log tabs will stay empty"
);
return;
}
};
tracing::info!("bee-log tailer: following {log_path:?}");
let mut current_inode: Option<u64> = inode_of_open_file(&file).await;
let mut cursor: u64 = 0;
let mut leftover = String::new();
let mut buf = vec![0u8; 8 * 1024];
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::debug!("bee-log tailer: cancelled, exiting");
break;
}
_ = tokio::time::sleep(POLL_INTERVAL) => {}
}
loop {
match file.read(&mut buf).await {
Ok(0) => break, Ok(n) => {
cursor += n as u64;
let chunk = String::from_utf8_lossy(&buf[..n]);
leftover.push_str(&chunk);
let mut last_end = 0usize;
let mut emit = Vec::<&str>::new();
for (idx, _) in leftover.match_indices('\n') {
emit.push(&leftover[last_end..idx]);
last_end = idx + 1;
}
let emitted_lines: Vec<String> = emit.iter().map(|s| s.to_string()).collect();
let new_leftover = leftover[last_end..].to_string();
leftover = new_leftover;
for line in emitted_lines {
let Some(entry) = parse_line(&line) else {
continue;
};
let Some(tab) = entry.tab() else {
continue;
};
if tab == LogTab::BeeHttp && entry.is_bee_tui_request() {
continue;
}
if tx.send((tab, entry.to_log_line())).is_err() {
tracing::debug!("bee-log tailer: receiver dropped; exiting");
return;
}
}
}
Err(e) => {
tracing::warn!("bee-log tailer: read error on {log_path:?}: {e}");
break;
}
}
}
if let Some((path_inode, path_size)) = stat_path(&log_path).await {
let rotated = current_inode.is_some_and(|ino| ino != path_inode);
let truncated = path_size < cursor;
if rotated || truncated {
tracing::info!(
"bee-log tailer: rotation detected (rotated={rotated}, \
truncated={truncated}); re-opening {log_path:?}"
);
if let Some(new_file) = reopen(&log_path).await {
file = new_file;
current_inode = inode_of_open_file(&file).await;
cursor = 0;
leftover.clear();
}
}
}
}
}
async fn reopen(path: &Path) -> Option<tokio::fs::File> {
match tokio::fs::File::open(path).await {
Ok(f) => Some(f),
Err(e) => {
tracing::warn!("bee-log tailer: failed to re-open {path:?} after rotation: {e}");
None
}
}
}
async fn stat_path(path: &Path) -> Option<(u64, u64)> {
let meta = tokio::fs::metadata(path).await.ok()?;
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
Some((meta.ino(), meta.len()))
}
#[cfg(not(unix))]
{
Some((meta.len(), meta.len()))
}
}
async fn inode_of_open_file(file: &tokio::fs::File) -> Option<u64> {
let meta = file.metadata().await.ok()?;
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
Some(meta.ino())
}
#[cfg(not(unix))]
{
Some(meta.len())
}
}
async fn open_with_retry(path: &PathBuf, cancel: &CancellationToken) -> Option<tokio::fs::File> {
for _ in 0..OPEN_RETRY_LIMIT {
if cancel.is_cancelled() {
return None;
}
match tokio::fs::File::open(path).await {
Ok(f) => return Some(f),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tokio::time::sleep(OPEN_RETRY_INTERVAL).await;
}
Err(e) => {
tracing::warn!("bee-log tailer: cannot open {path:?}: {e}");
return None;
}
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::unbounded_channel;
async fn make_temp_file() -> (PathBuf, tokio::fs::File) {
let path = std::env::temp_dir().join(format!(
"bee-log-tailer-test-{}.log",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let f = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.unwrap();
(path, f)
}
#[tokio::test]
async fn forwards_parsed_lines_to_channel() {
let (path, mut f) = make_temp_file().await;
let (tx, mut rx) = unbounded_channel();
let cancel = CancellationToken::new();
spawn(path.clone(), tx, cancel.clone());
f.write_all(
b"\"time\"=\"2026-05-07 22:14:19.000000\" \"level\"=\"error\" \"logger\"=\"node/foo\" \"msg\"=\"boom\"\n",
)
.await
.unwrap();
f.write_all(b"\"time\"=\"t2\" \"level\"=\"debu")
.await
.unwrap();
f.flush().await.unwrap();
let received = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
cancel.cancel();
let _ = std::fs::remove_file(&path);
let (tab, line) = received
.expect("first line should arrive")
.expect("channel open");
assert_eq!(tab, LogTab::Errors);
assert_eq!(line.timestamp, "2026-05-07 22:14:19.000000");
assert_eq!(line.logger, "node/foo");
assert!(line.message.starts_with("boom"));
}
#[tokio::test]
async fn cancel_stops_the_task() {
let (path, _) = make_temp_file().await;
let (tx, _rx) = unbounded_channel();
let cancel = CancellationToken::new();
spawn(path.clone(), tx, cancel.clone());
cancel.cancel();
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn survives_rotation_via_rename() {
let (path, mut f) = make_temp_file().await;
let (tx, mut rx) = unbounded_channel();
let cancel = CancellationToken::new();
spawn(path.clone(), tx, cancel.clone());
f.write_all(b"\"time\"=\"t1\" \"level\"=\"info\" \"logger\"=\"node\" \"msg\"=\"first\"\n")
.await
.unwrap();
f.flush().await.unwrap();
let recv1 = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
let (_, first_line) = recv1
.expect("first line should arrive")
.expect("channel open");
assert_eq!(first_line.timestamp, "t1");
drop(f);
let rotated = path.with_extension("log.1");
std::fs::rename(&path, &rotated).unwrap();
let mut f2 = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.unwrap();
f2.write_all(
b"\"time\"=\"t2\" \"level\"=\"info\" \"logger\"=\"node\" \"msg\"=\"second\"\n",
)
.await
.unwrap();
f2.flush().await.unwrap();
let recv2 = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
cancel.cancel();
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(&rotated);
let (_, second_line) = recv2
.expect("post-rotation line should arrive")
.expect("channel open");
assert_eq!(second_line.timestamp, "t2");
}
#[tokio::test]
async fn unknown_severity_lines_are_dropped() {
let (path, mut f) = make_temp_file().await;
let (tx, mut rx) = unbounded_channel();
let cancel = CancellationToken::new();
spawn(path.clone(), tx, cancel.clone());
f.write_all(b"\"time\"=\"t1\" \"level\"=\"panic\" \"logger\"=\"node\" \"msg\"=\"x\"\n")
.await
.unwrap();
f.write_all(b"\"time\"=\"t2\" \"level\"=\"info\" \"logger\"=\"node\" \"msg\"=\"y\"\n")
.await
.unwrap();
f.flush().await.unwrap();
let recv = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
cancel.cancel();
let _ = std::fs::remove_file(&path);
let (tab, line) = recv.expect("info line should arrive").expect("channel");
assert_eq!(tab, LogTab::Info);
assert_eq!(line.timestamp, "t2");
}
}