use std::collections::HashSet;
use std::path::Path;
use std::process::Stdio;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use crate::context::Issues;
use crate::shell::{CommandExecError, CommandExt};
use crate::workflow::Workflow;
use super::event::EventProducer;
const PULL_COMMAND_TIMEOUT: Duration = Duration::from_secs(60);
const STDERR_TAIL_BYTES: usize = 4096;
#[derive(Clone)]
pub(super) struct IntakeLoop {
workflow: Arc<Workflow>,
producer: EventProducer,
}
impl IntakeLoop {
pub(super) fn new(workflow: Arc<Workflow>, producer: EventProducer) -> Self {
Self { workflow, producer }
}
pub(super) fn start(self, shutdown: CancellationToken) -> JoinHandle<()> {
let span = tracing::info_span!("intake");
tokio::spawn(async move { self.run(shutdown).await }.instrument(span))
}
async fn run(self, shutdown: CancellationToken) {
let max_iterations = self.workflow.schema().loop_.max_iterations;
let mut iterations = 0_u64;
loop {
if shutdown.is_cancelled() || max_iterations.is_some_and(|max| iterations >= max) {
break;
}
iterations = iterations.saturating_add(1);
match self.run_once(&shutdown).await {
Ok(()) => {},
Err(IntakeError::Cancelled) if shutdown.is_cancelled() => break,
Err(error) => self.producer.intake_failed(error).await,
}
let wait = Duration::from_secs(self.workflow.schema().issues.pull.idle_sec);
tokio::select! {
_ = shutdown.cancelled() => break,
_ = tokio::time::sleep(wait) => {},
}
}
self.producer.intake_stopped().await;
}
async fn run_once(&self, shutdown: &CancellationToken) -> Result<(), IntakeError> {
let stdout = run_pull_command(self.workflow.as_ref(), shutdown).await?;
let issues = parse_issues_output(&stdout)?;
tracing::info!(candidates = issues.len(), "issues pulled");
let mut seen = HashSet::new();
for issue in issues.iter().cloned() {
if seen.insert(issue.id.clone()) {
self.producer.intake_issue(issue).await;
} else {
tracing::warn!(
issue_id = %issue.id,
"duplicate issue id from intake; keeping first issue",
);
}
}
Ok(())
}
}
async fn run_pull_command(workflow: &Workflow, shutdown: &CancellationToken) -> Result<String, IntakeError> {
let command = &workflow.schema().issues.pull.command;
let cwd = workflow.workflow_path().parent().unwrap_or_else(|| Path::new("."));
let started = Instant::now();
let mut cmd = shell_command(command);
cmd.current_dir(cwd).stdout(Stdio::piped()).stderr(Stdio::piped());
tracing::debug!(
cwd = %cwd.display(),
"issue pull command starting",
);
let mut child = cmd.timeout(PULL_COMMAND_TIMEOUT).spawn().map_err(IntakeError::PullCommand)?;
let output = tokio::select! {
result = child.wait_with_output() => result.map_err(IntakeError::PullCommand)?,
_ = shutdown.cancelled() => {
child.cancel();
let _ = child.wait().await;
return Err(IntakeError::Cancelled);
},
};
let duration_ms = started.elapsed().as_millis() as u64;
if !output.status.success() {
return Err(IntakeError::PullCommandExit {
code: output.status.code().unwrap_or(-1),
stderr_tail: tail_utf8(&output.stderr, STDERR_TAIL_BYTES),
});
}
tracing::info!(duration_ms, "issues pull command completed");
String::from_utf8(output.stdout).map_err(IntakeError::PullCommandStdout)
}
fn parse_issues_output(output: &str) -> Result<Issues, IntakeError> {
serde_json::from_str(output).map_err(IntakeError::ParseIssues)
}
fn tail_utf8(bytes: &[u8], limit: usize) -> String {
if bytes.len() <= limit {
return String::from_utf8_lossy(bytes).into_owned();
}
let start = bytes.len() - limit;
String::from_utf8_lossy(&bytes[start..]).into_owned()
}
#[cfg(windows)]
fn shell_command(command: &str) -> Command {
let mut shell = Command::new("cmd");
shell.args(["/C", command]);
shell
}
#[cfg(not(windows))]
fn shell_command(command: &str) -> Command {
let mut shell = Command::new("sh");
shell.args(["-c", command]);
shell
}
#[derive(Debug, Error)]
enum IntakeError {
#[error(transparent)]
PullCommand(CommandExecError),
#[error("issue pull command exited with code {code}: {stderr_tail}")]
PullCommandExit { code: i32, stderr_tail: String },
#[error("issue pull command stdout was not valid UTF-8: {0}")]
PullCommandStdout(FromUtf8Error),
#[error("failed to parse issue pull JSON: {0}")]
ParseIssues(serde_json::Error),
#[error("issue pull command cancelled")]
Cancelled,
}
#[cfg(all(test, not(windows)))]
mod tests {
use std::sync::Arc;
use super::super::event::{IntakeEvent, OrchestratorEvent, event_channel};
use super::*;
#[test]
fn parses_pull_stdout_as_issue_json() {
let issues = parse_issues_output(
r#"[
{"id":"123","title":"Add retry tests","state":"todo","labels":["vik"]},
{"identifier":"LIN-1","title":"Ship state alias","status":"work"}
]"#,
)
.expect("issues parse");
assert_eq!(issues.len(), 2);
assert_eq!(issues[0].id, "123");
assert_eq!(issues[0].state, "todo");
assert_eq!(issues[1].id, "LIN-1");
assert_eq!(issues[1].state, "work");
}
#[test]
fn stderr_tail_is_limited() {
let tail = tail_utf8(b"abcdef", 3);
assert_eq!(tail, "def");
}
#[tokio::test]
async fn runs_pull_command_from_workflow_directory() {
let cwd = std::env::current_dir().expect("cwd");
let workflow = Workflow::builder()
.pull_command("pwd")
.workflow_path(cwd.join("workflow.yml"))
.build();
let stdout = run_pull_command(&workflow, &CancellationToken::new())
.await
.expect("pull command runs");
let actual = std::fs::canonicalize(stdout.trim()).expect("canonical actual cwd");
let expected = std::fs::canonicalize(cwd).expect("canonical workflow dir");
assert_eq!(actual, expected);
}
#[tokio::test]
async fn nonzero_pull_command_keeps_stderr_tail() {
let cwd = std::env::current_dir().expect("cwd");
let workflow = Workflow::builder()
.pull_command("printf '%s' 'broken' >&2; exit 7")
.workflow_path(cwd.join("workflow.yml"))
.build();
let err = run_pull_command(&workflow, &CancellationToken::new())
.await
.expect_err("pull command must fail");
assert!(matches!(
err,
IntakeError::PullCommandExit {
code: 7,
ref stderr_tail
} if stderr_tail == "broken"
));
}
#[tokio::test]
async fn run_once_emits_issues_from_command_json() {
let cwd = std::env::current_dir().expect("cwd");
let workflow = Workflow::builder()
.pull_command(r#"printf '%s' '[{"id":"ABC-1","title":"Pulled","state":"todo"}]'"#)
.workflow_path(cwd.join("workflow.yml"))
.build();
let (producer, mut consumer) = event_channel();
let intake = IntakeLoop::new(Arc::new(workflow), producer);
intake.run_once(&CancellationToken::new()).await.expect("intake runs");
match consumer.recv().await.expect("event") {
OrchestratorEvent::Intake(IntakeEvent::Issue(issue)) => {
assert_eq!(issue.id, "ABC-1");
assert_eq!(issue.title, "Pulled");
assert_eq!(issue.state, "todo");
},
_ => panic!("expected intake issue"),
}
}
}