Skip to main content

routa_server/api/
shared_sessions.rs

1use std::{convert::Infallible, pin::Pin, time::Duration};
2
3use axum::{
4    extract::{Path, Query, State},
5    http::StatusCode,
6    response::sse::{Event, Sse},
7    routing::get,
8    Json, Router,
9};
10use chrono::Utc;
11use serde::Deserialize;
12use serde_json::{json, Value};
13use tokio::sync::broadcast;
14
15use crate::state::AppState;
16
17mod store;
18
19type SharedSseStream = Pin<Box<dyn tokio_stream::Stream<Item = Result<Event, Infallible>> + Send>>;
20
21pub fn router() -> Router<AppState> {
22    Router::new()
23        .route("/", get(list_shared_sessions).post(create_shared_session))
24        .route(
25            "/{shared_session_id}",
26            get(get_shared_session).delete(close_shared_session),
27        )
28        .route(
29            "/{shared_session_id}/join",
30            axum::routing::post(join_shared_session),
31        )
32        .route(
33            "/{shared_session_id}/leave",
34            axum::routing::post(leave_shared_session),
35        )
36        .route(
37            "/{shared_session_id}/participants",
38            get(list_shared_session_participants),
39        )
40        .route(
41            "/{shared_session_id}/messages",
42            get(list_shared_session_messages).post(send_shared_session_message),
43        )
44        .route(
45            "/{shared_session_id}/prompts",
46            axum::routing::post(send_shared_session_prompt),
47        )
48        .route(
49            "/{shared_session_id}/approvals/{approval_id}",
50            axum::routing::post(respond_shared_prompt_approval),
51        )
52        .route("/{shared_session_id}/stream", get(shared_session_stream))
53}
54
55#[derive(Debug, Deserialize)]
56#[serde(rename_all = "camelCase")]
57struct ListSharedSessionsQuery {
58    workspace_id: Option<String>,
59    host_session_id: Option<String>,
60    status: Option<store::SharedSessionStatus>,
61}
62
63async fn list_shared_sessions(
64    Query(query): Query<ListSharedSessionsQuery>,
65) -> store::HandlerResult<Json<Value>> {
66    let mut shared_store = store::shared_session_store().write().await;
67    shared_store.expire_sessions();
68
69    let mut sessions: Vec<store::SharedSession> = shared_store
70        .sessions
71        .values()
72        .filter(|session| {
73            if let Some(ws) = &query.workspace_id {
74                if &session.workspace_id != ws {
75                    return false;
76                }
77            }
78            if let Some(host_session_id) = &query.host_session_id {
79                if &session.host_session_id != host_session_id {
80                    return false;
81                }
82            }
83            if let Some(status) = &query.status {
84                if &session.status != status {
85                    return false;
86                }
87            }
88            true
89        })
90        .cloned()
91        .collect();
92    sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
93
94    Ok(Json(json!({
95        "sessions": sessions
96            .iter()
97            .map(|session| store::session_to_json(session, false))
98            .collect::<Vec<_>>(),
99    })))
100}
101
102#[derive(Debug, Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct CreateSharedSessionRequest {
105    host_session_id: String,
106    host_user_id: String,
107    host_display_name: Option<String>,
108    mode: Option<store::SharedSessionMode>,
109    workspace_id: Option<String>,
110    expires_in_minutes: Option<i64>,
111}
112
113async fn create_shared_session(
114    State(state): State<AppState>,
115    Json(body): Json<CreateSharedSessionRequest>,
116) -> store::HandlerResult<(StatusCode, Json<Value>)> {
117    if body.host_session_id.trim().is_empty() {
118        return Err(store::into_http_error(store::ApiErr::bad_request(
119            "MISSING_HOST_SESSION_ID",
120            "hostSessionId is required.",
121        )));
122    }
123    if body.host_user_id.trim().is_empty() {
124        return Err(store::into_http_error(store::ApiErr::bad_request(
125            "MISSING_HOST_USER_ID",
126            "hostUserId is required.",
127        )));
128    }
129
130    let host_session = state
131        .acp_manager
132        .get_session(&body.host_session_id)
133        .await
134        .ok_or_else(|| {
135            store::into_http_error(store::ApiErr::not_found(
136                "HOST_SESSION_NOT_FOUND",
137                format!("Host session not found: {}", body.host_session_id),
138            ))
139        })?;
140
141    let expires_at = body.expires_in_minutes.and_then(|minutes| {
142        if minutes <= 0 {
143            None
144        } else {
145            Some(Utc::now() + chrono::Duration::minutes(minutes))
146        }
147    });
148
149    let mode = body
150        .mode
151        .unwrap_or(store::SharedSessionMode::PromptWithApproval);
152    let session = store::SharedSession {
153        id: uuid::Uuid::new_v4().to_string(),
154        workspace_id: body
155            .workspace_id
156            .unwrap_or_else(|| host_session.workspace_id.clone()),
157        host_user_id: body.host_user_id.trim().to_string(),
158        host_session_id: body.host_session_id.clone(),
159        approval_required: mode == store::SharedSessionMode::PromptWithApproval,
160        mode,
161        invite_token: uuid::Uuid::new_v4().simple().to_string(),
162        created_at: Utc::now(),
163        expires_at,
164        status: store::SharedSessionStatus::Active,
165    };
166    let host_participant = store::SharedSessionParticipant {
167        id: uuid::Uuid::new_v4().to_string(),
168        shared_session_id: session.id.clone(),
169        user_id: body.host_user_id.trim().to_string(),
170        display_name: body.host_display_name.clone(),
171        role: store::SharedSessionRole::Host,
172        access_token: uuid::Uuid::new_v4().simple().to_string(),
173        joined_at: Utc::now(),
174        left_at: None,
175    };
176
177    {
178        let mut shared_store = store::shared_session_store().write().await;
179        shared_store.expire_sessions();
180        shared_store
181            .sessions
182            .insert(session.id.clone(), session.clone());
183        shared_store.insert_participant(host_participant.clone());
184        shared_store.emit_session_event(
185            &session.id,
186            "shared_session_created",
187            json!({ "session": store::session_to_json(&session, false) }),
188        );
189        shared_store.emit_session_event(
190            &session.id,
191            "participant_joined",
192            json!({ "participant": store::participant_to_json(&host_participant, false) }),
193        );
194    }
195
196    if let Some(rx) = state.acp_manager.subscribe(&session.host_session_id).await {
197        store::spawn_host_notification_forwarder(session.id.clone(), rx);
198    }
199
200    Ok((
201        StatusCode::CREATED,
202        Json(json!({
203            "session": store::session_to_json(&session, true),
204            "inviteToken": session.invite_token,
205            "hostParticipant": store::participant_to_json(&host_participant, true),
206        })),
207    ))
208}
209
210async fn get_shared_session(
211    Path(shared_session_id): Path<String>,
212) -> store::HandlerResult<Json<Value>> {
213    let shared_store = store::shared_session_store().read().await;
214    let Some(session) = shared_store.sessions.get(&shared_session_id) else {
215        return Err(store::into_http_error(store::ApiErr::not_found(
216            "SESSION_NOT_FOUND",
217            "Shared session not found.",
218        )));
219    };
220
221    if session.status != store::SharedSessionStatus::Active {
222        return Err(store::into_http_error(store::ApiErr::conflict(
223            "SESSION_INACTIVE",
224            format!("Shared session is {:?}.", session.status),
225        )));
226    }
227
228    let participants = shared_store
229        .list_participants(&shared_session_id)
230        .iter()
231        .map(|participant| store::participant_to_json(participant, false))
232        .collect::<Vec<_>>();
233    let approvals = shared_store
234        .list_approvals(&shared_session_id)
235        .iter()
236        .map(store::approval_to_json)
237        .collect::<Vec<_>>();
238
239    Ok(Json(json!({
240        "session": store::session_to_json(session, false),
241        "participants": participants,
242        "approvals": approvals,
243    })))
244}
245
246#[derive(Debug, Deserialize)]
247#[serde(rename_all = "camelCase")]
248struct ParticipantAuthRequest {
249    participant_id: String,
250    participant_token: String,
251}
252
253async fn close_shared_session(
254    Path(shared_session_id): Path<String>,
255    Json(body): Json<ParticipantAuthRequest>,
256) -> store::HandlerResult<Json<Value>> {
257    let mut shared_store = store::shared_session_store().write().await;
258    shared_store.expire_sessions();
259    let participant = shared_store
260        .authenticate_participant(
261            &shared_session_id,
262            &body.participant_id,
263            &body.participant_token,
264        )
265        .map_err(store::into_http_error)?
266        .clone();
267    if participant.role != store::SharedSessionRole::Host {
268        return Err(store::into_http_error(store::ApiErr::forbidden(
269            "HOST_REQUIRED",
270            "Only host can close the shared session.",
271        )));
272    }
273    shared_store.finalize_session(&shared_session_id, store::SharedSessionStatus::Closed);
274    let session = shared_store
275        .sessions
276        .get(&shared_session_id)
277        .cloned()
278        .ok_or_else(|| {
279            store::into_http_error(store::ApiErr::not_found(
280                "SESSION_NOT_FOUND",
281                "Shared session not found.",
282            ))
283        })?;
284
285    Ok(Json(json!({
286        "closed": true,
287        "session": store::session_to_json(&session, false),
288    })))
289}
290
291#[derive(Debug, Deserialize)]
292#[serde(rename_all = "camelCase")]
293struct JoinSharedSessionRequest {
294    invite_token: String,
295    user_id: String,
296    display_name: Option<String>,
297    role: Option<store::SharedSessionRole>,
298}
299
300async fn join_shared_session(
301    Path(shared_session_id): Path<String>,
302    Json(body): Json<JoinSharedSessionRequest>,
303) -> store::HandlerResult<Json<Value>> {
304    if body.user_id.trim().is_empty() {
305        return Err(store::into_http_error(store::ApiErr::bad_request(
306            "MISSING_USER_ID",
307            "userId is required.",
308        )));
309    }
310
311    let mut shared_store = store::shared_session_store().write().await;
312    shared_store.expire_sessions();
313    let session = shared_store
314        .ensure_active_session(&shared_session_id)
315        .map_err(store::into_http_error)?
316        .clone();
317
318    if session.invite_token != body.invite_token {
319        return Err(store::into_http_error(store::ApiErr::forbidden(
320            "INVALID_INVITE_TOKEN",
321            "Invite token is invalid.",
322        )));
323    }
324
325    if let Some(existing) =
326        shared_store.find_active_participant_by_user_id(&shared_session_id, &body.user_id)
327    {
328        return Ok(Json(json!({
329            "session": store::session_to_json(&session, false),
330            "participant": store::participant_to_json(&existing, true),
331        })));
332    }
333
334    let role = match body.role.unwrap_or(store::SharedSessionRole::Collaborator) {
335        store::SharedSessionRole::Host => store::SharedSessionRole::Collaborator,
336        value => value,
337    };
338    let participant = store::SharedSessionParticipant {
339        id: uuid::Uuid::new_v4().to_string(),
340        shared_session_id: shared_session_id.clone(),
341        user_id: body.user_id.trim().to_string(),
342        display_name: body.display_name.clone(),
343        role,
344        access_token: uuid::Uuid::new_v4().simple().to_string(),
345        joined_at: Utc::now(),
346        left_at: None,
347    };
348    shared_store.insert_participant(participant.clone());
349    shared_store.emit_session_event(
350        &shared_session_id,
351        "participant_joined",
352        json!({ "participant": store::participant_to_json(&participant, false) }),
353    );
354
355    Ok(Json(json!({
356        "session": store::session_to_json(&session, false),
357        "participant": store::participant_to_json(&participant, true),
358    })))
359}
360
361async fn leave_shared_session(
362    Path(shared_session_id): Path<String>,
363    Json(body): Json<ParticipantAuthRequest>,
364) -> store::HandlerResult<Json<Value>> {
365    let mut shared_store = store::shared_session_store().write().await;
366    shared_store.expire_sessions();
367    let mut participant = shared_store
368        .authenticate_participant(
369            &shared_session_id,
370            &body.participant_id,
371            &body.participant_token,
372        )
373        .map_err(store::into_http_error)?
374        .clone();
375
376    if participant.left_at.is_none() {
377        participant.left_at = Some(Utc::now());
378        shared_store
379            .participants
380            .insert(participant.id.clone(), participant.clone());
381        shared_store.emit_session_event(
382            &shared_session_id,
383            "participant_left",
384            json!({ "participant": store::participant_to_json(&participant, false) }),
385        );
386    }
387
388    Ok(Json(json!({
389        "participant": store::participant_to_json(&participant, false),
390    })))
391}
392
393async fn list_shared_session_participants(
394    Path(shared_session_id): Path<String>,
395) -> store::HandlerResult<Json<Value>> {
396    let mut shared_store = store::shared_session_store().write().await;
397    shared_store.expire_sessions();
398    shared_store
399        .ensure_active_session(&shared_session_id)
400        .map_err(store::into_http_error)?;
401
402    let participants = shared_store
403        .list_participants(&shared_session_id)
404        .iter()
405        .map(|participant| store::participant_to_json(participant, false))
406        .collect::<Vec<_>>();
407
408    Ok(Json(json!({ "participants": participants })))
409}
410
411async fn list_shared_session_messages(
412    Path(shared_session_id): Path<String>,
413) -> store::HandlerResult<Json<Value>> {
414    let mut shared_store = store::shared_session_store().write().await;
415    shared_store.expire_sessions();
416    shared_store
417        .ensure_active_session(&shared_session_id)
418        .map_err(store::into_http_error)?;
419
420    let messages = shared_store
421        .list_messages(&shared_session_id)
422        .iter()
423        .map(store::message_to_json)
424        .collect::<Vec<_>>();
425
426    Ok(Json(json!({ "messages": messages })))
427}
428
429#[derive(Debug, Deserialize)]
430#[serde(rename_all = "camelCase")]
431struct SendMessageRequest {
432    participant_id: String,
433    participant_token: String,
434    text: String,
435}
436
437async fn send_shared_session_message(
438    Path(shared_session_id): Path<String>,
439    Json(body): Json<SendMessageRequest>,
440) -> store::HandlerResult<Json<Value>> {
441    let text = body.text.trim();
442    if text.is_empty() {
443        return Err(store::into_http_error(store::ApiErr::bad_request(
444            "EMPTY_MESSAGE",
445            "Message text cannot be empty.",
446        )));
447    }
448
449    let mut shared_store = store::shared_session_store().write().await;
450    shared_store.expire_sessions();
451    let session = shared_store
452        .ensure_active_session(&shared_session_id)
453        .map_err(store::into_http_error)?
454        .clone();
455    let participant = shared_store
456        .authenticate_participant(
457            &shared_session_id,
458            &body.participant_id,
459            &body.participant_token,
460        )
461        .map_err(store::into_http_error)?
462        .clone();
463
464    if !store::can_comment(&session.mode, &participant.role) {
465        return Err(store::into_http_error(store::ApiErr::forbidden(
466            "COMMENT_NOT_ALLOWED",
467            "Message sending is not allowed in current mode.",
468        )));
469    }
470
471    let message = store::SharedSessionMessage {
472        id: uuid::Uuid::new_v4().to_string(),
473        shared_session_id: shared_session_id.clone(),
474        participant_id: participant.id.clone(),
475        author_user_id: participant.user_id.clone(),
476        kind: "comment".to_string(),
477        text: text.to_string(),
478        created_at: Utc::now(),
479        approval_id: None,
480    };
481    shared_store.append_message(message.clone());
482    shared_store.emit_session_event(
483        &shared_session_id,
484        "message_created",
485        json!({
486            "message": store::message_to_json(&message),
487            "participant": store::participant_to_json(&participant, false),
488        }),
489    );
490
491    Ok(Json(json!({
492        "message": store::message_to_json(&message),
493    })))
494}
495
496#[derive(Debug, Deserialize)]
497#[serde(rename_all = "camelCase")]
498struct SendPromptRequest {
499    participant_id: String,
500    participant_token: String,
501    prompt: String,
502}
503
504async fn send_shared_session_prompt(
505    State(state): State<AppState>,
506    Path(shared_session_id): Path<String>,
507    Json(body): Json<SendPromptRequest>,
508) -> store::HandlerResult<Json<Value>> {
509    let prompt = body.prompt.trim();
510    if prompt.is_empty() {
511        return Err(store::into_http_error(store::ApiErr::bad_request(
512            "EMPTY_PROMPT",
513            "Prompt cannot be empty.",
514        )));
515    }
516
517    let (response_status, response_approval, dispatch) = {
518        let mut shared_store = store::shared_session_store().write().await;
519        shared_store.expire_sessions();
520
521        let session = shared_store
522            .ensure_active_session(&shared_session_id)
523            .map_err(store::into_http_error)?
524            .clone();
525        let participant = shared_store
526            .authenticate_participant(
527                &shared_session_id,
528                &body.participant_id,
529                &body.participant_token,
530            )
531            .map_err(store::into_http_error)?
532            .clone();
533
534        if !store::can_prompt(&session.mode, &participant.role) {
535            return Err(store::into_http_error(store::ApiErr::forbidden(
536                "PROMPT_NOT_ALLOWED",
537                "Prompt sending is not allowed in current mode.",
538            )));
539        }
540
541        if session.mode == store::SharedSessionMode::PromptWithApproval
542            && participant.role != store::SharedSessionRole::Host
543        {
544            let approval = store::SharedPromptApproval {
545                id: uuid::Uuid::new_v4().to_string(),
546                shared_session_id: shared_session_id.clone(),
547                participant_id: participant.id.clone(),
548                prompt: prompt.to_string(),
549                status: store::SharedPromptStatus::Pending,
550                created_at: Utc::now(),
551                resolved_at: None,
552                resolved_by_participant_id: None,
553                error_message: None,
554            };
555            shared_store.append_approval(approval.clone());
556            shared_store.append_message(store::SharedSessionMessage {
557                id: uuid::Uuid::new_v4().to_string(),
558                shared_session_id: shared_session_id.clone(),
559                participant_id: participant.id.clone(),
560                author_user_id: participant.user_id.clone(),
561                kind: "prompt".to_string(),
562                text: prompt.to_string(),
563                created_at: Utc::now(),
564                approval_id: Some(approval.id.clone()),
565            });
566            shared_store.emit_session_event(
567                &shared_session_id,
568                "prompt_pending_approval",
569                json!({
570                    "approval": store::approval_to_json(&approval),
571                    "participant": store::participant_to_json(&participant, false),
572                }),
573            );
574            (store::SharedPromptStatus::Pending, Some(approval), None)
575        } else {
576            let approval = store::SharedPromptApproval {
577                id: uuid::Uuid::new_v4().to_string(),
578                shared_session_id: shared_session_id.clone(),
579                participant_id: participant.id.clone(),
580                prompt: prompt.to_string(),
581                status: store::SharedPromptStatus::Approved,
582                created_at: Utc::now(),
583                resolved_at: Some(Utc::now()),
584                resolved_by_participant_id: Some(participant.id.clone()),
585                error_message: None,
586            };
587            shared_store.append_approval(approval.clone());
588            shared_store.emit_session_event(
589                &shared_session_id,
590                "prompt_approved",
591                json!({
592                    "approval": store::approval_to_json(&approval),
593                    "participant": store::participant_to_json(&participant, false),
594                }),
595            );
596            (
597                store::SharedPromptStatus::Approved,
598                Some(approval.clone()),
599                Some(store::PromptDispatchRequest {
600                    shared_session_id: shared_session_id.clone(),
601                    host_session_id: session.host_session_id.clone(),
602                    approval_id: approval.id.clone(),
603                    prompt: approval.prompt.clone(),
604                }),
605            )
606        }
607    };
608
609    if let Some(dispatch_request) = dispatch {
610        tokio::spawn(store::dispatch_shared_prompt(
611            state.clone(),
612            dispatch_request,
613        ));
614    }
615
616    Ok(Json(json!({
617        "status": store::to_prompt_status_value(&response_status),
618        "approval": response_approval.as_ref().map(store::approval_to_json),
619    })))
620}
621
622#[derive(Debug, Deserialize)]
623#[serde(rename_all = "camelCase")]
624struct RespondApprovalRequest {
625    participant_id: String,
626    participant_token: String,
627    action: String,
628}
629
630async fn respond_shared_prompt_approval(
631    State(state): State<AppState>,
632    Path((shared_session_id, approval_id)): Path<(String, String)>,
633    Json(body): Json<RespondApprovalRequest>,
634) -> store::HandlerResult<Json<Value>> {
635    if body.action != "approve" && body.action != "reject" {
636        return Err(store::into_http_error(store::ApiErr::bad_request(
637            "INVALID_ACTION",
638            "action must be approve or reject.",
639        )));
640    }
641
642    let mut dispatch: Option<store::PromptDispatchRequest> = None;
643    let approval: store::SharedPromptApproval;
644
645    {
646        let mut shared_store = store::shared_session_store().write().await;
647        shared_store.expire_sessions();
648        let session = shared_store
649            .ensure_active_session(&shared_session_id)
650            .map_err(store::into_http_error)?
651            .clone();
652        let host_participant = shared_store
653            .authenticate_participant(
654                &shared_session_id,
655                &body.participant_id,
656                &body.participant_token,
657            )
658            .map_err(store::into_http_error)?
659            .clone();
660
661        if host_participant.role != store::SharedSessionRole::Host {
662            return Err(store::into_http_error(store::ApiErr::forbidden(
663                "HOST_REQUIRED",
664                "Only host can approve or reject prompts.",
665            )));
666        }
667
668        let approval_snapshot = {
669            let approval_entry = shared_store
670                .approvals
671                .get_mut(&approval_id)
672                .ok_or_else(|| {
673                    store::into_http_error(store::ApiErr::not_found(
674                        "APPROVAL_NOT_FOUND",
675                        "Approval request not found.",
676                    ))
677                })?;
678            if approval_entry.shared_session_id != shared_session_id {
679                return Err(store::into_http_error(store::ApiErr::not_found(
680                    "APPROVAL_NOT_FOUND",
681                    "Approval request not found.",
682                )));
683            }
684            if approval_entry.status != store::SharedPromptStatus::Pending {
685                return Err(store::into_http_error(store::ApiErr::conflict(
686                    "APPROVAL_ALREADY_RESOLVED",
687                    "Approval has already been resolved.",
688                )));
689            }
690            approval_entry.resolved_at = Some(Utc::now());
691            approval_entry.resolved_by_participant_id = Some(host_participant.id.clone());
692            if body.action == "reject" {
693                approval_entry.status = store::SharedPromptStatus::Rejected;
694            } else {
695                approval_entry.status = store::SharedPromptStatus::Approved;
696            }
697            approval_entry.clone()
698        };
699
700        if body.action == "reject" {
701            shared_store.emit_session_event(
702                &shared_session_id,
703                "prompt_rejected",
704                json!({
705                    "approval": store::approval_to_json(&approval_snapshot),
706                    "participant": store::participant_to_json(&host_participant, false),
707                }),
708            );
709        } else {
710            shared_store.emit_session_event(
711                &shared_session_id,
712                "prompt_approved",
713                json!({
714                    "approval": store::approval_to_json(&approval_snapshot),
715                    "participant": store::participant_to_json(&host_participant, false),
716                }),
717            );
718            dispatch = Some(store::PromptDispatchRequest {
719                shared_session_id: shared_session_id.clone(),
720                host_session_id: session.host_session_id.clone(),
721                approval_id: approval_snapshot.id.clone(),
722                prompt: approval_snapshot.prompt.clone(),
723            });
724        }
725
726        approval = approval_snapshot;
727    }
728
729    if let Some(dispatch_request) = dispatch {
730        tokio::spawn(store::dispatch_shared_prompt(
731            state.clone(),
732            dispatch_request,
733        ));
734    }
735
736    Ok(Json(json!({
737        "approval": store::approval_to_json(&approval),
738    })))
739}
740
741#[derive(Debug, Deserialize)]
742#[serde(rename_all = "camelCase")]
743struct SharedSessionStreamQuery {
744    participant_id: String,
745    participant_token: String,
746}
747
748async fn shared_session_stream(
749    Path(shared_session_id): Path<String>,
750    Query(query): Query<SharedSessionStreamQuery>,
751) -> store::HandlerResult<Sse<SharedSseStream>> {
752    let mut shared_store = store::shared_session_store().write().await;
753    shared_store.expire_sessions();
754    shared_store
755        .authenticate_participant(
756            &shared_session_id,
757            &query.participant_id,
758            &query.participant_token,
759        )
760        .map_err(store::into_http_error)?;
761    let mut rx = shared_store.subscribe(&shared_session_id).ok_or_else(|| {
762        store::into_http_error(store::ApiErr::not_found(
763            "SESSION_NOT_FOUND",
764            "Shared session not found.",
765        ))
766    })?;
767    drop(shared_store);
768
769    let connected = json!({
770        "type": "connected",
771        "sharedSessionId": shared_session_id,
772        "timestamp": Utc::now().to_rfc3339(),
773    });
774    let stream: SharedSseStream = Box::pin(async_stream::stream! {
775        yield Ok(Event::default().data(connected.to_string()));
776        let mut heartbeat = tokio::time::interval(Duration::from_secs(15));
777        loop {
778            tokio::select! {
779                msg = rx.recv() => {
780                    match msg {
781                        Ok(event) => yield Ok(Event::default().data(event.to_string())),
782                        Err(broadcast::error::RecvError::Lagged(_)) => continue,
783                        Err(broadcast::error::RecvError::Closed) => break,
784                    }
785                }
786                _ = heartbeat.tick() => yield Ok(Event::default().comment("heartbeat")),
787            }
788        }
789    });
790
791    Ok(Sse::new(stream))
792}