use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::task::JoinHandle;
use meerkat_core::SessionId;
use meerkat_runtime::SessionServiceRuntimeExt as _;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum StdinLineFormat {
Text,
Json,
}
pub fn parse_stdin_line(line: &str, format: StdinLineFormat) -> String {
match format {
StdinLineFormat::Text => line.to_string(),
StdinLineFormat::Json => meerkat_comms::transport::plain_codec::parse_plain_line(line),
}
}
fn make_stdin_external_event_input(
body: String,
format: StdinLineFormat,
) -> meerkat_runtime::Input {
meerkat_runtime::Input::ExternalEvent(meerkat_runtime::ExternalEventInput {
header: meerkat_runtime::InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: meerkat_runtime::InputOrigin::External {
source_name: "stdin".to_string(),
},
durability: meerkat_runtime::InputDurability::Durable,
visibility: meerkat_runtime::InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
event_type: "stdin".to_string(),
payload: match format {
StdinLineFormat::Text => serde_json::json!({ "body": body }),
StdinLineFormat::Json => match serde_json::from_str::<serde_json::Value>(&body) {
Ok(parsed) => serde_json::json!({ "body": parsed }),
Err(_) => serde_json::json!({ "body": body }),
},
},
blocks: None,
handling_mode: meerkat_core::types::HandlingMode::Queue,
render_metadata: None,
})
}
pub fn spawn_stdin_reader(
runtime_adapter: Arc<meerkat_runtime::MeerkatMachine>,
session_id: SessionId,
format: StdinLineFormat,
) -> JoinHandle<()> {
tokio::spawn(async move {
let stdin = tokio::io::stdin();
let reader = tokio::io::BufReader::new(stdin);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let body = parse_stdin_line(&line, format);
if body.is_empty() {
continue;
}
match runtime_adapter
.accept_input(&session_id, make_stdin_external_event_input(body, format))
.await
{
Ok(
meerkat_runtime::AcceptOutcome::Accepted { .. }
| meerkat_runtime::AcceptOutcome::Deduplicated { .. },
) => {}
Ok(meerkat_runtime::AcceptOutcome::Rejected { reason }) => {
tracing::warn!("Stdin reader: runtime rejected stdin event: {reason}");
}
Ok(outcome) => {
tracing::warn!(
"Stdin reader: unexpected runtime accept outcome, exiting: {outcome:?}"
);
return;
}
Err(meerkat_runtime::RuntimeDriverError::Destroyed) => {
tracing::debug!("Stdin reader: runtime destroyed, exiting");
return;
}
Err(meerkat_runtime::RuntimeDriverError::NotReady { state }) => {
tracing::warn!(
"Stdin reader: runtime not ready for stdin event while in state {state}"
);
}
Err(err) => {
tracing::warn!("Stdin reader: runtime admission failed: {err}");
return;
}
}
}
tracing::debug!("Stdin reader: EOF, exiting");
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_parse_stdin_line_json_with_body() {
assert_eq!(
parse_stdin_line(r#"{"body":"hello"}"#, StdinLineFormat::Json),
"hello"
);
}
#[test]
fn test_parse_stdin_line_plain_text() {
assert_eq!(
parse_stdin_line("plain text", StdinLineFormat::Text),
"plain text"
);
}
#[test]
fn test_parse_stdin_line_json_without_body() {
let result = parse_stdin_line(r#"{"event":"email","from":"john"}"#, StdinLineFormat::Json);
assert!(result.contains("email"));
assert!(result.contains("john"));
}
#[test]
fn test_parse_stdin_line_empty() {
assert_eq!(parse_stdin_line("", StdinLineFormat::Text), "");
}
#[test]
fn test_make_stdin_external_event_input_uses_runtime_external_event_shape() {
let input = make_stdin_external_event_input("hello".to_string(), StdinLineFormat::Text);
let meerkat_runtime::Input::ExternalEvent(event) = input else {
panic!("expected external event input");
};
assert_eq!(event.event_type, "stdin");
assert_eq!(event.payload["body"], "hello");
assert_eq!(event.blocks, None);
assert!(matches!(
event.header.source,
meerkat_runtime::InputOrigin::External { ref source_name } if source_name == "stdin"
));
}
#[test]
fn test_make_stdin_external_event_input_text_mode_preserves_json_looking_literal() {
let input = make_stdin_external_event_input(
r#"{"level":"info"}"#.to_string(),
StdinLineFormat::Text,
);
let meerkat_runtime::Input::ExternalEvent(event) = input else {
panic!("expected external event input");
};
assert_eq!(event.payload["body"], r#"{"level":"info"}"#);
}
#[test]
fn test_make_stdin_external_event_input_json_mode_preserves_structured_json() {
let input = make_stdin_external_event_input(
r#"{"level":"info"}"#.to_string(),
StdinLineFormat::Json,
);
let meerkat_runtime::Input::ExternalEvent(event) = input else {
panic!("expected external event input");
};
assert_eq!(event.payload["body"]["level"], "info");
}
}