torc 0.22.1

Workflow management system
use super::*;

pub(crate) async fn handle_workflow_events_stream<C, B>(
    server: Server<C>,
    workflow_id: i64,
    request: Request<B>,
    context: C,
) -> Response<Body>
where
    B: HttpBody + Send + 'static,
    B::Data: Send,
    B::Error: std::fmt::Display,
    C: Has<XSpanIdString> + Has<Option<Authorization>> + Send + Sync + 'static,
{
    let min_severity = parse_event_stream_level(request.uri().query());

    match server.get_workflow(workflow_id, &context).await {
        Ok(GetWorkflowResponse::SuccessfulResponse(_)) => {}
        Ok(GetWorkflowResponse::ForbiddenErrorResponse(body)) => {
            return json_response_with_status(&body, StatusCode::FORBIDDEN);
        }
        Ok(GetWorkflowResponse::NotFoundErrorResponse(body)) => {
            return json_response_with_status(&body, StatusCode::NOT_FOUND);
        }
        Ok(GetWorkflowResponse::DefaultErrorResponse(body)) => {
            return json_response_with_status(&body, StatusCode::INTERNAL_SERVER_ERROR);
        }
        Err(err) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, err.0),
    }

    let mut receiver = server.subscribe_to_events();
    let stream = async_stream::stream! {
        loop {
            match receiver.recv().await {
                Ok(event)
                    if event.workflow_id == workflow_id && event.severity >= min_severity =>
                {
                    let data = serde_json::to_string(&event).unwrap_or_default();
                    yield Ok::<_, std::convert::Infallible>(
                        format!("event: {}\ndata: {}\n\n", event.event_type, data)
                    );
                }
                Ok(_) => continue,
                Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
                    yield Ok::<_, std::convert::Infallible>(
                        format!("event: warning\ndata: {{\"dropped\": {}}}\n\n", count)
                    );
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
            }
        }
    };

    Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_TYPE, "text/event-stream")
        .header("Cache-Control", "no-cache")
        .header("X-Accel-Buffering", "no")
        .body(Body::from_stream(stream))
        .expect("valid SSE response")
}