use std::path::PathBuf;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
use crate::parser::SourceKind;
#[derive(Debug, Clone, Serialize)]
pub struct EventRecord {
pub event: String,
pub data: serde_json::Value,
pub raw: Option<String>,
pub event_id: Option<String>,
pub ts_ms: u64,
}
impl EventRecord {
pub fn from_line(line: String, now_ms: u64) -> Self {
Self {
event: "line".into(),
data: serde_json::Value::String(line.clone()),
raw: Some(line),
event_id: None,
ts_ms: now_ms,
}
}
pub fn lifecycle(kind: &str, reason: serde_json::Value, now_ms: u64) -> Self {
Self {
event: kind.into(),
data: reason,
raw: None,
event_id: None,
ts_ms: now_ms,
}
}
}
#[derive(Debug, Error)]
pub enum WatchStartError {
#[error("limit reached ({current}/{max})")]
LimitReached { current: usize, max: usize },
#[error("invalid path: {0}")]
InvalidPath(String),
#[error("invalid regex: {0}")]
InvalidRegex(String),
#[error("invalid jq expression: {0}")]
InvalidJq(String),
#[error("unresolved env var: {0}")]
UnresolvedEnv(String),
#[error("shell exited immediately (code={0:?})")]
SourceFailedImmediately(Option<i32>),
#[error("internal error: {0}")]
Internal(String),
}
pub enum SourceImpl {
File(FileSource),
Shell(ShellSource),
Sse(SseSource),
}
pub struct FileSource {
pub path: PathBuf,
}
pub struct ShellSource {
pub cmd: String,
}
pub struct SseSource {
pub url: String,
pub headers: Vec<(String, String)>,
}
impl SseSource {
pub fn build(url: &str, headers: &[(String, String)]) -> Result<Self, WatchStartError> {
let url = crate::sse::substitute_env_vars(url)
.map_err(|e| WatchStartError::UnresolvedEnv(e.to_string()))?;
let mut subst_headers = Vec::with_capacity(headers.len());
for (k, v) in headers {
let v2 = crate::sse::substitute_env_vars(v)
.map_err(|e| WatchStartError::UnresolvedEnv(e.to_string()))?;
subst_headers.push((k.clone(), v2));
}
Ok(Self {
url,
headers: subst_headers,
})
}
}
impl SourceImpl {
pub async fn run(self, tx: mpsc::Sender<EventRecord>, stop: oneshot::Receiver<()>) {
match self {
SourceImpl::File(s) => run_file(s, tx, stop).await,
SourceImpl::Shell(s) => run_shell(s, tx, stop).await,
SourceImpl::Sse(s) => crate::sse::run_sse(s, tx, stop).await,
}
}
pub fn kind(&self) -> SourceKind {
match self {
SourceImpl::File(_) => SourceKind::File,
SourceImpl::Shell(_) => SourceKind::Shell,
SourceImpl::Sse(_) => SourceKind::Sse,
}
}
}
async fn run_file(src: FileSource, tx: mpsc::Sender<EventRecord>, mut stop: oneshot::Receiver<()>) {
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom},
};
let open = File::open(&src.path).await;
let file = match open {
Ok(mut f) => {
if let Err(e) = f.seek(SeekFrom::End(0)).await {
warn!(path = %src.path.display(), "watch file seek failed: {e}");
}
f
}
Err(e) => {
let _ = tx
.send(EventRecord::lifecycle(
"_error",
serde_json::json!({ "msg": format!("open failed: {e}") }),
now_ms(),
))
.await;
return;
}
};
let mut current_inode = inode_of(&file).await;
let mut reader = BufReader::new(file);
let mut line = String::new();
let mut interval = tokio::time::interval(std::time::Duration::from_millis(200));
loop {
tokio::select! {
_ = &mut stop => return,
_ = interval.tick() => {}
}
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let stripped = line.trim_end_matches(&['\r', '\n'][..]).to_owned();
if tx
.send(EventRecord::from_line(stripped, now_ms()))
.await
.is_err()
{
return;
}
}
Err(_) => break,
}
}
if let Ok(metadata) = tokio::fs::metadata(&src.path).await {
let now_size = metadata.len();
let pos = reader.get_mut().stream_position().await.unwrap_or(0);
let new_inode = inode_from_metadata(&metadata);
let inode_changed =
current_inode.is_some() && new_inode.is_some() && current_inode != new_inode;
if inode_changed || now_size < pos {
if let Ok(f) = File::open(&src.path).await {
current_inode = inode_from_metadata(&metadata);
reader = BufReader::new(f);
}
}
}
}
}
async fn run_shell(
src: ShellSource,
tx: mpsc::Sender<EventRecord>,
mut stop: oneshot::Receiver<()>,
) {
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
};
let (program, arg) = if cfg!(target_os = "windows") {
("powershell", "-Command")
} else {
("sh", "-c")
};
let mut cmd = Command::new(program);
cmd.arg(arg)
.arg(&src.cmd)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x08000000);
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
let _ = tx
.send(EventRecord::lifecycle(
"_error",
serde_json::json!({ "msg": format!("spawn failed: {e}") }),
now_ms(),
))
.await;
return;
}
};
let stdout = child.stdout.take().expect("piped");
let stderr = child.stderr.take().expect("piped");
let (line_tx, mut line_rx) = mpsc::channel::<String>(64);
let lt1 = line_tx.clone();
let r1 = tokio::spawn(async move {
let mut r = BufReader::new(stdout).lines();
while let Ok(Some(line)) = r.next_line().await {
if lt1.send(line).await.is_err() {
break;
}
}
});
let r2 = tokio::spawn(async move {
let mut r = BufReader::new(stderr).lines();
while let Ok(Some(line)) = r.next_line().await {
if line_tx.send(line).await.is_err() {
break;
}
}
});
loop {
tokio::select! {
_ = &mut stop => {
let _ = child.start_kill();
break;
}
line = line_rx.recv() => {
match line {
Some(l) => {
if tx.send(EventRecord::from_line(l, now_ms())).await.is_err() {
let _ = child.start_kill();
break;
}
}
None => {
let exit = child.wait().await.ok().and_then(|s| s.code());
let _ = tx
.send(EventRecord::lifecycle(
"_disconnect",
serde_json::json!({ "reason": "process_exited", "code": exit }),
now_ms(),
))
.await;
break;
}
}
}
}
}
if let Err(e) = r1.await {
warn!("watch shell stdout reader task failed: {e}");
}
if let Err(e) = r2.await {
warn!("watch shell stderr reader task failed: {e}");
}
}
fn now_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
async fn inode_of(file: &tokio::fs::File) -> Option<u64> {
let metadata = file.metadata().await.ok()?;
inode_from_metadata(&metadata)
}
#[cfg(unix)]
fn inode_from_metadata(m: &std::fs::Metadata) -> Option<u64> {
use std::os::unix::fs::MetadataExt;
Some(m.ino())
}
#[cfg(not(unix))]
fn inode_from_metadata(_m: &std::fs::Metadata) -> Option<u64> {
None
}