forge_core_server/routes/
drafts.rs

1use axum::{
2    Router,
3    extract::{
4        Query, State,
5        ws::{WebSocket, WebSocketUpgrade},
6    },
7    response::IntoResponse,
8    routing::get,
9};
10use forge_core_deployment::Deployment;
11use futures_util::{SinkExt, StreamExt, TryStreamExt};
12use serde::Deserialize;
13use uuid::Uuid;
14
15use crate::DeploymentImpl;
16
17#[derive(Debug, Deserialize)]
18pub struct DraftsQuery {
19    pub project_id: Uuid,
20}
21
22pub async fn stream_project_drafts_ws(
23    ws: WebSocketUpgrade,
24    State(deployment): State<DeploymentImpl>,
25    Query(query): Query<DraftsQuery>,
26) -> impl IntoResponse {
27    ws.on_upgrade(move |socket| async move {
28        if let Err(e) = handle_project_drafts_ws(socket, deployment, query.project_id).await {
29            tracing::warn!("drafts WS closed: {}", e);
30        }
31    })
32}
33
34async fn handle_project_drafts_ws(
35    socket: WebSocket,
36    deployment: DeploymentImpl,
37    project_id: Uuid,
38) -> anyhow::Result<()> {
39    let mut stream = deployment
40        .events()
41        .stream_drafts_for_project_raw(project_id)
42        .await?
43        .map_ok(|msg| msg.to_ws_message_unchecked());
44
45    let (mut sender, mut receiver) = socket.split();
46    tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
47
48    while let Some(item) = stream.next().await {
49        match item {
50            Ok(msg) => {
51                if sender.send(msg).await.is_err() {
52                    break;
53                }
54            }
55            Err(e) => {
56                tracing::error!("stream error: {}", e);
57                break;
58            }
59        }
60    }
61    Ok(())
62}
63
64pub fn router(_deployment: &DeploymentImpl) -> Router<DeploymentImpl> {
65    let inner = Router::new().route("/stream/ws", get(stream_project_drafts_ws));
66    Router::new().nest("/drafts", inner)
67}