forge_core_server/routes/
drafts.rs1use axum::{
2 Router,
3 extract::{
4 Query, State,
5 ws::{WebSocket, WebSocketUpgrade},
6 },
7 response::IntoResponse,
8 routing::get,
9};
10use forge_core_deployment::Deployment;
11use futures_util::{SinkExt, StreamExt, TryStreamExt};
12use serde::Deserialize;
13use uuid::Uuid;
14
15use crate::DeploymentImpl;
16
17#[derive(Debug, Deserialize)]
18pub struct DraftsQuery {
19 pub project_id: Uuid,
20}
21
22pub async fn stream_project_drafts_ws(
23 ws: WebSocketUpgrade,
24 State(deployment): State<DeploymentImpl>,
25 Query(query): Query<DraftsQuery>,
26) -> impl IntoResponse {
27 ws.on_upgrade(move |socket| async move {
28 if let Err(e) = handle_project_drafts_ws(socket, deployment, query.project_id).await {
29 tracing::warn!("drafts WS closed: {}", e);
30 }
31 })
32}
33
34async fn handle_project_drafts_ws(
35 socket: WebSocket,
36 deployment: DeploymentImpl,
37 project_id: Uuid,
38) -> anyhow::Result<()> {
39 let mut stream = deployment
40 .events()
41 .stream_drafts_for_project_raw(project_id)
42 .await?
43 .map_ok(|msg| msg.to_ws_message_unchecked());
44
45 let (mut sender, mut receiver) = socket.split();
46 tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
47
48 while let Some(item) = stream.next().await {
49 match item {
50 Ok(msg) => {
51 if sender.send(msg).await.is_err() {
52 break;
53 }
54 }
55 Err(e) => {
56 tracing::error!("stream error: {}", e);
57 break;
58 }
59 }
60 }
61 Ok(())
62}
63
64pub fn router(_deployment: &DeploymentImpl) -> Router<DeploymentImpl> {
65 let inner = Router::new().route("/stream/ws", get(stream_project_drafts_ws));
66 Router::new().nest("/drafts", inner)
67}