1use std::{convert::Infallible, pin::Pin, time::Duration};
2
3use axum::{
4 extract::{Path, Query, State},
5 http::StatusCode,
6 response::sse::{Event, Sse},
7 routing::get,
8 Json, Router,
9};
10use chrono::Utc;
11use serde::Deserialize;
12use serde_json::{json, Value};
13use tokio::sync::broadcast;
14
15use crate::state::AppState;
16
17mod store;
18
19type SharedSseStream = Pin<Box<dyn tokio_stream::Stream<Item = Result<Event, Infallible>> + Send>>;
20
21pub fn router() -> Router<AppState> {
22 Router::new()
23 .route("/", get(list_shared_sessions).post(create_shared_session))
24 .route(
25 "/{shared_session_id}",
26 get(get_shared_session).delete(close_shared_session),
27 )
28 .route(
29 "/{shared_session_id}/join",
30 axum::routing::post(join_shared_session),
31 )
32 .route(
33 "/{shared_session_id}/leave",
34 axum::routing::post(leave_shared_session),
35 )
36 .route(
37 "/{shared_session_id}/participants",
38 get(list_shared_session_participants),
39 )
40 .route(
41 "/{shared_session_id}/messages",
42 get(list_shared_session_messages).post(send_shared_session_message),
43 )
44 .route(
45 "/{shared_session_id}/prompts",
46 axum::routing::post(send_shared_session_prompt),
47 )
48 .route(
49 "/{shared_session_id}/approvals/{approval_id}",
50 axum::routing::post(respond_shared_prompt_approval),
51 )
52 .route("/{shared_session_id}/stream", get(shared_session_stream))
53}
54
55#[derive(Debug, Deserialize)]
56#[serde(rename_all = "camelCase")]
57struct ListSharedSessionsQuery {
58 workspace_id: Option<String>,
59 host_session_id: Option<String>,
60 status: Option<store::SharedSessionStatus>,
61}
62
63async fn list_shared_sessions(
64 Query(query): Query<ListSharedSessionsQuery>,
65) -> store::HandlerResult<Json<Value>> {
66 let mut shared_store = store::shared_session_store().write().await;
67 shared_store.expire_sessions();
68
69 let mut sessions: Vec<store::SharedSession> = shared_store
70 .sessions
71 .values()
72 .filter(|session| {
73 if let Some(ws) = &query.workspace_id {
74 if &session.workspace_id != ws {
75 return false;
76 }
77 }
78 if let Some(host_session_id) = &query.host_session_id {
79 if &session.host_session_id != host_session_id {
80 return false;
81 }
82 }
83 if let Some(status) = &query.status {
84 if &session.status != status {
85 return false;
86 }
87 }
88 true
89 })
90 .cloned()
91 .collect();
92 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
93
94 Ok(Json(json!({
95 "sessions": sessions
96 .iter()
97 .map(|session| store::session_to_json(session, false))
98 .collect::<Vec<_>>(),
99 })))
100}
101
102#[derive(Debug, Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct CreateSharedSessionRequest {
105 host_session_id: String,
106 host_user_id: String,
107 host_display_name: Option<String>,
108 mode: Option<store::SharedSessionMode>,
109 workspace_id: Option<String>,
110 expires_in_minutes: Option<i64>,
111}
112
113async fn create_shared_session(
114 State(state): State<AppState>,
115 Json(body): Json<CreateSharedSessionRequest>,
116) -> store::HandlerResult<(StatusCode, Json<Value>)> {
117 if body.host_session_id.trim().is_empty() {
118 return Err(store::into_http_error(store::ApiErr::bad_request(
119 "MISSING_HOST_SESSION_ID",
120 "hostSessionId is required.",
121 )));
122 }
123 if body.host_user_id.trim().is_empty() {
124 return Err(store::into_http_error(store::ApiErr::bad_request(
125 "MISSING_HOST_USER_ID",
126 "hostUserId is required.",
127 )));
128 }
129
130 let host_session = state
131 .acp_manager
132 .get_session(&body.host_session_id)
133 .await
134 .ok_or_else(|| {
135 store::into_http_error(store::ApiErr::not_found(
136 "HOST_SESSION_NOT_FOUND",
137 format!("Host session not found: {}", body.host_session_id),
138 ))
139 })?;
140
141 let expires_at = body.expires_in_minutes.and_then(|minutes| {
142 if minutes <= 0 {
143 None
144 } else {
145 Some(Utc::now() + chrono::Duration::minutes(minutes))
146 }
147 });
148
149 let mode = body
150 .mode
151 .unwrap_or(store::SharedSessionMode::PromptWithApproval);
152 let session = store::SharedSession {
153 id: uuid::Uuid::new_v4().to_string(),
154 workspace_id: body
155 .workspace_id
156 .unwrap_or_else(|| host_session.workspace_id.clone()),
157 host_user_id: body.host_user_id.trim().to_string(),
158 host_session_id: body.host_session_id.clone(),
159 approval_required: mode == store::SharedSessionMode::PromptWithApproval,
160 mode,
161 invite_token: uuid::Uuid::new_v4().simple().to_string(),
162 created_at: Utc::now(),
163 expires_at,
164 status: store::SharedSessionStatus::Active,
165 };
166 let host_participant = store::SharedSessionParticipant {
167 id: uuid::Uuid::new_v4().to_string(),
168 shared_session_id: session.id.clone(),
169 user_id: body.host_user_id.trim().to_string(),
170 display_name: body.host_display_name.clone(),
171 role: store::SharedSessionRole::Host,
172 access_token: uuid::Uuid::new_v4().simple().to_string(),
173 joined_at: Utc::now(),
174 left_at: None,
175 };
176
177 {
178 let mut shared_store = store::shared_session_store().write().await;
179 shared_store.expire_sessions();
180 shared_store
181 .sessions
182 .insert(session.id.clone(), session.clone());
183 shared_store.insert_participant(host_participant.clone());
184 shared_store.emit_session_event(
185 &session.id,
186 "shared_session_created",
187 json!({ "session": store::session_to_json(&session, false) }),
188 );
189 shared_store.emit_session_event(
190 &session.id,
191 "participant_joined",
192 json!({ "participant": store::participant_to_json(&host_participant, false) }),
193 );
194 }
195
196 if let Some(rx) = state.acp_manager.subscribe(&session.host_session_id).await {
197 store::spawn_host_notification_forwarder(session.id.clone(), rx);
198 }
199
200 Ok((
201 StatusCode::CREATED,
202 Json(json!({
203 "session": store::session_to_json(&session, true),
204 "inviteToken": session.invite_token,
205 "hostParticipant": store::participant_to_json(&host_participant, true),
206 })),
207 ))
208}
209
210async fn get_shared_session(
211 Path(shared_session_id): Path<String>,
212) -> store::HandlerResult<Json<Value>> {
213 let shared_store = store::shared_session_store().read().await;
214 let Some(session) = shared_store.sessions.get(&shared_session_id) else {
215 return Err(store::into_http_error(store::ApiErr::not_found(
216 "SESSION_NOT_FOUND",
217 "Shared session not found.",
218 )));
219 };
220
221 if session.status != store::SharedSessionStatus::Active {
222 return Err(store::into_http_error(store::ApiErr::conflict(
223 "SESSION_INACTIVE",
224 format!("Shared session is {:?}.", session.status),
225 )));
226 }
227
228 let participants = shared_store
229 .list_participants(&shared_session_id)
230 .iter()
231 .map(|participant| store::participant_to_json(participant, false))
232 .collect::<Vec<_>>();
233 let approvals = shared_store
234 .list_approvals(&shared_session_id)
235 .iter()
236 .map(store::approval_to_json)
237 .collect::<Vec<_>>();
238
239 Ok(Json(json!({
240 "session": store::session_to_json(session, false),
241 "participants": participants,
242 "approvals": approvals,
243 })))
244}
245
246#[derive(Debug, Deserialize)]
247#[serde(rename_all = "camelCase")]
248struct ParticipantAuthRequest {
249 participant_id: String,
250 participant_token: String,
251}
252
253async fn close_shared_session(
254 Path(shared_session_id): Path<String>,
255 Json(body): Json<ParticipantAuthRequest>,
256) -> store::HandlerResult<Json<Value>> {
257 let mut shared_store = store::shared_session_store().write().await;
258 shared_store.expire_sessions();
259 let participant = shared_store
260 .authenticate_participant(
261 &shared_session_id,
262 &body.participant_id,
263 &body.participant_token,
264 )
265 .map_err(store::into_http_error)?
266 .clone();
267 if participant.role != store::SharedSessionRole::Host {
268 return Err(store::into_http_error(store::ApiErr::forbidden(
269 "HOST_REQUIRED",
270 "Only host can close the shared session.",
271 )));
272 }
273 shared_store.finalize_session(&shared_session_id, store::SharedSessionStatus::Closed);
274 let session = shared_store
275 .sessions
276 .get(&shared_session_id)
277 .cloned()
278 .ok_or_else(|| {
279 store::into_http_error(store::ApiErr::not_found(
280 "SESSION_NOT_FOUND",
281 "Shared session not found.",
282 ))
283 })?;
284
285 Ok(Json(json!({
286 "closed": true,
287 "session": store::session_to_json(&session, false),
288 })))
289}
290
291#[derive(Debug, Deserialize)]
292#[serde(rename_all = "camelCase")]
293struct JoinSharedSessionRequest {
294 invite_token: String,
295 user_id: String,
296 display_name: Option<String>,
297 role: Option<store::SharedSessionRole>,
298}
299
300async fn join_shared_session(
301 Path(shared_session_id): Path<String>,
302 Json(body): Json<JoinSharedSessionRequest>,
303) -> store::HandlerResult<Json<Value>> {
304 if body.user_id.trim().is_empty() {
305 return Err(store::into_http_error(store::ApiErr::bad_request(
306 "MISSING_USER_ID",
307 "userId is required.",
308 )));
309 }
310
311 let mut shared_store = store::shared_session_store().write().await;
312 shared_store.expire_sessions();
313 let session = shared_store
314 .ensure_active_session(&shared_session_id)
315 .map_err(store::into_http_error)?
316 .clone();
317
318 if session.invite_token != body.invite_token {
319 return Err(store::into_http_error(store::ApiErr::forbidden(
320 "INVALID_INVITE_TOKEN",
321 "Invite token is invalid.",
322 )));
323 }
324
325 if let Some(existing) =
326 shared_store.find_active_participant_by_user_id(&shared_session_id, &body.user_id)
327 {
328 return Ok(Json(json!({
329 "session": store::session_to_json(&session, false),
330 "participant": store::participant_to_json(&existing, true),
331 })));
332 }
333
334 let role = match body.role.unwrap_or(store::SharedSessionRole::Collaborator) {
335 store::SharedSessionRole::Host => store::SharedSessionRole::Collaborator,
336 value => value,
337 };
338 let participant = store::SharedSessionParticipant {
339 id: uuid::Uuid::new_v4().to_string(),
340 shared_session_id: shared_session_id.clone(),
341 user_id: body.user_id.trim().to_string(),
342 display_name: body.display_name.clone(),
343 role,
344 access_token: uuid::Uuid::new_v4().simple().to_string(),
345 joined_at: Utc::now(),
346 left_at: None,
347 };
348 shared_store.insert_participant(participant.clone());
349 shared_store.emit_session_event(
350 &shared_session_id,
351 "participant_joined",
352 json!({ "participant": store::participant_to_json(&participant, false) }),
353 );
354
355 Ok(Json(json!({
356 "session": store::session_to_json(&session, false),
357 "participant": store::participant_to_json(&participant, true),
358 })))
359}
360
361async fn leave_shared_session(
362 Path(shared_session_id): Path<String>,
363 Json(body): Json<ParticipantAuthRequest>,
364) -> store::HandlerResult<Json<Value>> {
365 let mut shared_store = store::shared_session_store().write().await;
366 shared_store.expire_sessions();
367 let mut participant = shared_store
368 .authenticate_participant(
369 &shared_session_id,
370 &body.participant_id,
371 &body.participant_token,
372 )
373 .map_err(store::into_http_error)?
374 .clone();
375
376 if participant.left_at.is_none() {
377 participant.left_at = Some(Utc::now());
378 shared_store
379 .participants
380 .insert(participant.id.clone(), participant.clone());
381 shared_store.emit_session_event(
382 &shared_session_id,
383 "participant_left",
384 json!({ "participant": store::participant_to_json(&participant, false) }),
385 );
386 }
387
388 Ok(Json(json!({
389 "participant": store::participant_to_json(&participant, false),
390 })))
391}
392
393async fn list_shared_session_participants(
394 Path(shared_session_id): Path<String>,
395) -> store::HandlerResult<Json<Value>> {
396 let mut shared_store = store::shared_session_store().write().await;
397 shared_store.expire_sessions();
398 shared_store
399 .ensure_active_session(&shared_session_id)
400 .map_err(store::into_http_error)?;
401
402 let participants = shared_store
403 .list_participants(&shared_session_id)
404 .iter()
405 .map(|participant| store::participant_to_json(participant, false))
406 .collect::<Vec<_>>();
407
408 Ok(Json(json!({ "participants": participants })))
409}
410
411async fn list_shared_session_messages(
412 Path(shared_session_id): Path<String>,
413) -> store::HandlerResult<Json<Value>> {
414 let mut shared_store = store::shared_session_store().write().await;
415 shared_store.expire_sessions();
416 shared_store
417 .ensure_active_session(&shared_session_id)
418 .map_err(store::into_http_error)?;
419
420 let messages = shared_store
421 .list_messages(&shared_session_id)
422 .iter()
423 .map(store::message_to_json)
424 .collect::<Vec<_>>();
425
426 Ok(Json(json!({ "messages": messages })))
427}
428
429#[derive(Debug, Deserialize)]
430#[serde(rename_all = "camelCase")]
431struct SendMessageRequest {
432 participant_id: String,
433 participant_token: String,
434 text: String,
435}
436
437async fn send_shared_session_message(
438 Path(shared_session_id): Path<String>,
439 Json(body): Json<SendMessageRequest>,
440) -> store::HandlerResult<Json<Value>> {
441 let text = body.text.trim();
442 if text.is_empty() {
443 return Err(store::into_http_error(store::ApiErr::bad_request(
444 "EMPTY_MESSAGE",
445 "Message text cannot be empty.",
446 )));
447 }
448
449 let mut shared_store = store::shared_session_store().write().await;
450 shared_store.expire_sessions();
451 let session = shared_store
452 .ensure_active_session(&shared_session_id)
453 .map_err(store::into_http_error)?
454 .clone();
455 let participant = shared_store
456 .authenticate_participant(
457 &shared_session_id,
458 &body.participant_id,
459 &body.participant_token,
460 )
461 .map_err(store::into_http_error)?
462 .clone();
463
464 if !store::can_comment(&session.mode, &participant.role) {
465 return Err(store::into_http_error(store::ApiErr::forbidden(
466 "COMMENT_NOT_ALLOWED",
467 "Message sending is not allowed in current mode.",
468 )));
469 }
470
471 let message = store::SharedSessionMessage {
472 id: uuid::Uuid::new_v4().to_string(),
473 shared_session_id: shared_session_id.clone(),
474 participant_id: participant.id.clone(),
475 author_user_id: participant.user_id.clone(),
476 kind: "comment".to_string(),
477 text: text.to_string(),
478 created_at: Utc::now(),
479 approval_id: None,
480 };
481 shared_store.append_message(message.clone());
482 shared_store.emit_session_event(
483 &shared_session_id,
484 "message_created",
485 json!({
486 "message": store::message_to_json(&message),
487 "participant": store::participant_to_json(&participant, false),
488 }),
489 );
490
491 Ok(Json(json!({
492 "message": store::message_to_json(&message),
493 })))
494}
495
496#[derive(Debug, Deserialize)]
497#[serde(rename_all = "camelCase")]
498struct SendPromptRequest {
499 participant_id: String,
500 participant_token: String,
501 prompt: String,
502}
503
504async fn send_shared_session_prompt(
505 State(state): State<AppState>,
506 Path(shared_session_id): Path<String>,
507 Json(body): Json<SendPromptRequest>,
508) -> store::HandlerResult<Json<Value>> {
509 let prompt = body.prompt.trim();
510 if prompt.is_empty() {
511 return Err(store::into_http_error(store::ApiErr::bad_request(
512 "EMPTY_PROMPT",
513 "Prompt cannot be empty.",
514 )));
515 }
516
517 let (response_status, response_approval, dispatch) = {
518 let mut shared_store = store::shared_session_store().write().await;
519 shared_store.expire_sessions();
520
521 let session = shared_store
522 .ensure_active_session(&shared_session_id)
523 .map_err(store::into_http_error)?
524 .clone();
525 let participant = shared_store
526 .authenticate_participant(
527 &shared_session_id,
528 &body.participant_id,
529 &body.participant_token,
530 )
531 .map_err(store::into_http_error)?
532 .clone();
533
534 if !store::can_prompt(&session.mode, &participant.role) {
535 return Err(store::into_http_error(store::ApiErr::forbidden(
536 "PROMPT_NOT_ALLOWED",
537 "Prompt sending is not allowed in current mode.",
538 )));
539 }
540
541 if session.mode == store::SharedSessionMode::PromptWithApproval
542 && participant.role != store::SharedSessionRole::Host
543 {
544 let approval = store::SharedPromptApproval {
545 id: uuid::Uuid::new_v4().to_string(),
546 shared_session_id: shared_session_id.clone(),
547 participant_id: participant.id.clone(),
548 prompt: prompt.to_string(),
549 status: store::SharedPromptStatus::Pending,
550 created_at: Utc::now(),
551 resolved_at: None,
552 resolved_by_participant_id: None,
553 error_message: None,
554 };
555 shared_store.append_approval(approval.clone());
556 shared_store.append_message(store::SharedSessionMessage {
557 id: uuid::Uuid::new_v4().to_string(),
558 shared_session_id: shared_session_id.clone(),
559 participant_id: participant.id.clone(),
560 author_user_id: participant.user_id.clone(),
561 kind: "prompt".to_string(),
562 text: prompt.to_string(),
563 created_at: Utc::now(),
564 approval_id: Some(approval.id.clone()),
565 });
566 shared_store.emit_session_event(
567 &shared_session_id,
568 "prompt_pending_approval",
569 json!({
570 "approval": store::approval_to_json(&approval),
571 "participant": store::participant_to_json(&participant, false),
572 }),
573 );
574 (store::SharedPromptStatus::Pending, Some(approval), None)
575 } else {
576 let approval = store::SharedPromptApproval {
577 id: uuid::Uuid::new_v4().to_string(),
578 shared_session_id: shared_session_id.clone(),
579 participant_id: participant.id.clone(),
580 prompt: prompt.to_string(),
581 status: store::SharedPromptStatus::Approved,
582 created_at: Utc::now(),
583 resolved_at: Some(Utc::now()),
584 resolved_by_participant_id: Some(participant.id.clone()),
585 error_message: None,
586 };
587 shared_store.append_approval(approval.clone());
588 shared_store.emit_session_event(
589 &shared_session_id,
590 "prompt_approved",
591 json!({
592 "approval": store::approval_to_json(&approval),
593 "participant": store::participant_to_json(&participant, false),
594 }),
595 );
596 (
597 store::SharedPromptStatus::Approved,
598 Some(approval.clone()),
599 Some(store::PromptDispatchRequest {
600 shared_session_id: shared_session_id.clone(),
601 host_session_id: session.host_session_id.clone(),
602 approval_id: approval.id.clone(),
603 prompt: approval.prompt.clone(),
604 }),
605 )
606 }
607 };
608
609 if let Some(dispatch_request) = dispatch {
610 tokio::spawn(store::dispatch_shared_prompt(
611 state.clone(),
612 dispatch_request,
613 ));
614 }
615
616 Ok(Json(json!({
617 "status": store::to_prompt_status_value(&response_status),
618 "approval": response_approval.as_ref().map(store::approval_to_json),
619 })))
620}
621
622#[derive(Debug, Deserialize)]
623#[serde(rename_all = "camelCase")]
624struct RespondApprovalRequest {
625 participant_id: String,
626 participant_token: String,
627 action: String,
628}
629
630async fn respond_shared_prompt_approval(
631 State(state): State<AppState>,
632 Path((shared_session_id, approval_id)): Path<(String, String)>,
633 Json(body): Json<RespondApprovalRequest>,
634) -> store::HandlerResult<Json<Value>> {
635 if body.action != "approve" && body.action != "reject" {
636 return Err(store::into_http_error(store::ApiErr::bad_request(
637 "INVALID_ACTION",
638 "action must be approve or reject.",
639 )));
640 }
641
642 let mut dispatch: Option<store::PromptDispatchRequest> = None;
643 let approval: store::SharedPromptApproval;
644
645 {
646 let mut shared_store = store::shared_session_store().write().await;
647 shared_store.expire_sessions();
648 let session = shared_store
649 .ensure_active_session(&shared_session_id)
650 .map_err(store::into_http_error)?
651 .clone();
652 let host_participant = shared_store
653 .authenticate_participant(
654 &shared_session_id,
655 &body.participant_id,
656 &body.participant_token,
657 )
658 .map_err(store::into_http_error)?
659 .clone();
660
661 if host_participant.role != store::SharedSessionRole::Host {
662 return Err(store::into_http_error(store::ApiErr::forbidden(
663 "HOST_REQUIRED",
664 "Only host can approve or reject prompts.",
665 )));
666 }
667
668 let approval_snapshot = {
669 let approval_entry = shared_store
670 .approvals
671 .get_mut(&approval_id)
672 .ok_or_else(|| {
673 store::into_http_error(store::ApiErr::not_found(
674 "APPROVAL_NOT_FOUND",
675 "Approval request not found.",
676 ))
677 })?;
678 if approval_entry.shared_session_id != shared_session_id {
679 return Err(store::into_http_error(store::ApiErr::not_found(
680 "APPROVAL_NOT_FOUND",
681 "Approval request not found.",
682 )));
683 }
684 if approval_entry.status != store::SharedPromptStatus::Pending {
685 return Err(store::into_http_error(store::ApiErr::conflict(
686 "APPROVAL_ALREADY_RESOLVED",
687 "Approval has already been resolved.",
688 )));
689 }
690 approval_entry.resolved_at = Some(Utc::now());
691 approval_entry.resolved_by_participant_id = Some(host_participant.id.clone());
692 if body.action == "reject" {
693 approval_entry.status = store::SharedPromptStatus::Rejected;
694 } else {
695 approval_entry.status = store::SharedPromptStatus::Approved;
696 }
697 approval_entry.clone()
698 };
699
700 if body.action == "reject" {
701 shared_store.emit_session_event(
702 &shared_session_id,
703 "prompt_rejected",
704 json!({
705 "approval": store::approval_to_json(&approval_snapshot),
706 "participant": store::participant_to_json(&host_participant, false),
707 }),
708 );
709 } else {
710 shared_store.emit_session_event(
711 &shared_session_id,
712 "prompt_approved",
713 json!({
714 "approval": store::approval_to_json(&approval_snapshot),
715 "participant": store::participant_to_json(&host_participant, false),
716 }),
717 );
718 dispatch = Some(store::PromptDispatchRequest {
719 shared_session_id: shared_session_id.clone(),
720 host_session_id: session.host_session_id.clone(),
721 approval_id: approval_snapshot.id.clone(),
722 prompt: approval_snapshot.prompt.clone(),
723 });
724 }
725
726 approval = approval_snapshot;
727 }
728
729 if let Some(dispatch_request) = dispatch {
730 tokio::spawn(store::dispatch_shared_prompt(
731 state.clone(),
732 dispatch_request,
733 ));
734 }
735
736 Ok(Json(json!({
737 "approval": store::approval_to_json(&approval),
738 })))
739}
740
741#[derive(Debug, Deserialize)]
742#[serde(rename_all = "camelCase")]
743struct SharedSessionStreamQuery {
744 participant_id: String,
745 participant_token: String,
746}
747
748async fn shared_session_stream(
749 Path(shared_session_id): Path<String>,
750 Query(query): Query<SharedSessionStreamQuery>,
751) -> store::HandlerResult<Sse<SharedSseStream>> {
752 let mut shared_store = store::shared_session_store().write().await;
753 shared_store.expire_sessions();
754 shared_store
755 .authenticate_participant(
756 &shared_session_id,
757 &query.participant_id,
758 &query.participant_token,
759 )
760 .map_err(store::into_http_error)?;
761 let mut rx = shared_store.subscribe(&shared_session_id).ok_or_else(|| {
762 store::into_http_error(store::ApiErr::not_found(
763 "SESSION_NOT_FOUND",
764 "Shared session not found.",
765 ))
766 })?;
767 drop(shared_store);
768
769 let connected = json!({
770 "type": "connected",
771 "sharedSessionId": shared_session_id,
772 "timestamp": Utc::now().to_rfc3339(),
773 });
774 let stream: SharedSseStream = Box::pin(async_stream::stream! {
775 yield Ok(Event::default().data(connected.to_string()));
776 let mut heartbeat = tokio::time::interval(Duration::from_secs(15));
777 loop {
778 tokio::select! {
779 msg = rx.recv() => {
780 match msg {
781 Ok(event) => yield Ok(Event::default().data(event.to_string())),
782 Err(broadcast::error::RecvError::Lagged(_)) => continue,
783 Err(broadcast::error::RecvError::Closed) => break,
784 }
785 }
786 _ = heartbeat.tick() => yield Ok(Event::default().comment("heartbeat")),
787 }
788 }
789 });
790
791 Ok(Sse::new(stream))
792}