stormchaser-cli 1.3.2

A robust, distributed workflow engine for event-driven and human-triggered workflows.
Documentation
use crate::utils::{handle_response, require_token};
use anyhow::Result;

pub async fn watch_run(
    url: &str,
    token: Option<&str>,
    http_client: &reqwest_middleware::ClientWithMiddleware,
    id: stormchaser_model::RunId,
) -> Result<()> {
    let token = require_token(token)?;
    let res = http_client
        .get(format!("{}/api/v1/runs/{}/status/stream", url, id))
        .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", token))
        .send()
        .await?;

    if !res.status().is_success() {
        return handle_response(res).await;
    }

    use eventsource_stream::Eventsource;
    use futures::stream::StreamExt;
    let mut stream = res.bytes_stream().eventsource();
    while let Some(event) = stream.next().await {
        match event {
            Ok(event) => {
                if event.event == "error" {
                    eprintln!("Error from stream: {}", event.data);
                    break;
                }
                println!("{}: {}", event.event, event.data);
            }
            Err(e) => {
                eprintln!("Stream error: {}", e);
                break;
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use reqwest_middleware::ClientBuilder;
    use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
    use wiremock::matchers::{method, path};
    use wiremock::{Mock, MockServer, ResponseTemplate};

    #[tokio::test]
    async fn test_watch_run_success() {
        let server = MockServer::start().await;
        let id = stormchaser_model::RunId::new_v4();

        let sse_body = "event: status\ndata: running\n\n\
                        event: error\ndata: stream ended\n\n";

        Mock::given(method("GET"))
            .and(path(format!("/api/v1/runs/{}/status/stream", id)))
            .respond_with(
                ResponseTemplate::new(200)
                    .insert_header("content-type", "text/event-stream")
                    .set_body_string(sse_body),
            )
            .mount(&server)
            .await;

        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(0);
        let client = ClientBuilder::new(reqwest::Client::new())
            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
            .build();

        let result = watch_run(&server.uri(), Some("fake_token"), &client, id).await;

        assert!(result.is_ok());
    }
}