use super::protocol::{
ClientMessage, CommandMessage, CostPayload, ServerMessage, SessionMessage, StatusPayload,
UserMessage,
};
use crate::config::Config;
use crate::session::cancellation::SessionCancellation;
use crate::session::chat::session::{
execute_api_call_and_process_response, prepare_for_api_call, process_layers_if_enabled,
setup_and_initialize_session, setup_system_prompt_and_cache, ChatSession, GenericSessionArgs,
};
use crate::session::output::{OutputMode, WebSocketSink};
use crate::{log_debug, log_error, log_info};
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
type SessionLocks = Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>;
pub struct WebSocketServer {
addr: SocketAddr,
config: Arc<Config>,
role: String,
}
impl WebSocketServer {
pub fn new(host: &str, port: u16, config: Config, role: String) -> Result<Self> {
let addr: SocketAddr = format!("{}:{}", host, port).parse()?;
Ok(Self {
addr,
config: Arc::new(config),
role,
})
}
pub async fn start(&self) -> Result<()> {
let listener = TcpListener::bind(&self.addr).await?;
log_info!("WebSocket server listening on ws://{}", self.addr);
println!("🚀 WebSocket server started on ws://{}", self.addr);
println!("Press Ctrl+C to stop the server");
let sessions: Arc<Mutex<HashMap<String, ChatSession>>> =
Arc::new(Mutex::new(HashMap::new()));
let session_locks: SessionLocks = Arc::new(Mutex::new(HashMap::new()));
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
log_info!("Connection accepted from {}", peer_addr);
let config = Arc::clone(&self.config);
let role = self.role.clone();
let sessions = Arc::clone(&sessions);
let session_locks = Arc::clone(&session_locks);
tokio::spawn(async move {
if let Err(e) = handle_connection(
stream,
peer_addr,
config,
role,
sessions,
session_locks,
)
.await
{
log_error!("Connection handler failed for {}: {}", peer_addr, e);
}
});
}
Err(e) => {
log_error!("Failed to accept connection: {}", e);
}
}
}
}
}
async fn handle_connection(
stream: TcpStream,
peer_addr: SocketAddr,
config: Arc<Config>,
role: String,
sessions: Arc<Mutex<HashMap<String, ChatSession>>>,
session_locks: Arc<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
) -> Result<()> {
let ws_config = tokio_tungstenite::tungstenite::protocol::WebSocketConfig::default()
.max_message_size(Some(10 * 1024 * 1024)) .max_frame_size(Some(10 * 1024 * 1024)) .accept_unmasked_frames(false);
let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?;
log_info!("WebSocket handshake completed for {}", peer_addr);
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let mut active_session_ids: HashSet<String> = HashSet::new();
let welcome = ServerMessage::status(
format!("Connected to Octomind WebSocket server (role: {})", role),
None,
);
send_message(&mut ws_sender, &welcome).await?;
let (bg_tx, mut bg_rx) = tokio::sync::mpsc::unbounded_channel::<ServerMessage>();
loop {
tokio::select! {
ws_msg = ws_receiver.next() => {
let msg = match ws_msg {
Some(msg) => msg,
None => break, };
match msg {
Ok(Message::Text(text)) => {
log_debug!("Received message from {}: {} bytes", peer_addr, text.len());
let client_msg = match serde_json::from_str::<ClientMessage>(&text) {
Ok(msg) => {
log_debug!("Parsed message: {:?}", msg);
msg
}
Err(e) => {
log_error!("Invalid JSON from {}: {}", peer_addr, e);
let error = ServerMessage::error(format!("Invalid JSON: {}", e));
send_message(&mut ws_sender, &error).await?;
continue;
}
};
if let Err(e) = client_msg.validate() {
log_error!("Message validation failed from {}: {}", peer_addr, e);
let error = ServerMessage::error(e);
send_message(&mut ws_sender, &error).await?;
continue;
}
if let Err(e) = process_client_message(
client_msg,
&mut ws_sender,
&config,
&role,
&sessions,
&session_locks,
&mut active_session_ids,
&bg_tx,
)
.await
{
log_error!("Message processing failed for {}: {}", peer_addr, e);
let error = ServerMessage::error(format!("Internal error: {}", e));
send_message(&mut ws_sender, &error).await?;
}
}
Ok(Message::Close(_)) => {
log_info!("Client {} closed connection", peer_addr);
break;
}
Ok(Message::Ping(data)) => {
log_debug!("Ping received from {}", peer_addr);
if let Err(e) = ws_sender.send(Message::Pong(data)).await {
log_error!("Failed to send pong to {}: {}", peer_addr, e);
break;
}
}
Ok(_) => {
}
Err(e) => {
log_error!("WebSocket protocol error from {}: {}", peer_addr, e);
break;
}
}
}
bg_msg = bg_rx.recv() => {
if let Some(msg) = bg_msg {
if let Err(e) = send_message(&mut ws_sender, &msg).await {
log_error!("Failed to forward background message: {}", e);
break;
}
}
}
}
}
for sid in &active_session_ids {
crate::session::context::clear_notification_sender_for_session(sid);
}
log_info!(
"Connection closed: {} (cleaned up {} session(s))",
peer_addr,
active_session_ids.len()
);
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn process_client_message(
client_msg: ClientMessage,
ws_sender: &mut futures_util::stream::SplitSink<
WebSocketStream<TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
config: &Config,
role: &str,
sessions: &Arc<Mutex<HashMap<String, ChatSession>>>,
session_locks: &SessionLocks,
active_session_ids: &mut HashSet<String>,
bg_tx: &tokio::sync::mpsc::UnboundedSender<ServerMessage>,
) -> Result<()> {
match client_msg {
ClientMessage::Session(msg) => {
handle_session_message(
msg,
ws_sender,
config,
role,
sessions,
active_session_ids,
bg_tx,
)
.await
}
ClientMessage::Message(msg) => {
let session_id = msg.session_id.clone();
active_session_ids.insert(session_id.clone());
let lock = get_or_create_session_lock(&session_id, session_locks).await;
let guard = match lock.try_lock() {
Ok(guard) => guard,
Err(_) => {
let error = ServerMessage::error(format!(
"Session '{}' is busy processing another request. Please wait.",
session_id
));
send_message(ws_sender, &error).await?;
return Ok(());
}
};
let result = crate::session::context::with_session_id(session_id, async {
handle_user_message(msg, ws_sender, config, role, sessions).await
})
.await;
drop(guard);
result
}
ClientMessage::Command(msg) => {
let session_id = msg.session_id.clone();
active_session_ids.insert(session_id.clone());
let lock = get_or_create_session_lock(&session_id, session_locks).await;
let guard = match lock.try_lock() {
Ok(guard) => guard,
Err(_) => {
let error = ServerMessage::error(format!(
"Session '{}' is busy processing another request. Please wait.",
session_id
));
send_message(ws_sender, &error).await?;
return Ok(());
}
};
let result = crate::session::context::with_session_id(session_id, async {
handle_command_message(msg, ws_sender, config, role, sessions).await
})
.await;
drop(guard);
result
}
}
}
async fn get_or_create_session_lock(
session_id: &str,
session_locks: &SessionLocks,
) -> Arc<tokio::sync::Mutex<()>> {
let mut locks = session_locks.lock().await;
locks
.entry(session_id.to_string())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
}
fn spawn_ws_inbox_monitor(
session_id: String,
sessions: Arc<Mutex<HashMap<String, ChatSession>>>,
config: Config,
role: String,
bg_tx: tokio::sync::mpsc::UnboundedSender<ServerMessage>,
) {
tokio::spawn(async move {
log_debug!(
"WebSocket: inbox monitor started for session: {}",
session_id
);
loop {
let should_exit = crate::session::context::with_session_id(session_id.clone(), async {
crate::mcp::core::flush_due_to_inbox();
while crate::session::inbox::has_inbox_messages()
&& sessions.lock().await.contains_key(&session_id)
{
let inbox_msg = match crate::session::inbox::try_pop_inbox_message() {
Some(msg) => msg,
None => break,
};
log_debug!(
"WS monitor: processing inbox message from {:?} for {}",
inbox_msg.source,
session_id
);
let mut chat_session = match sessions.lock().await.remove(&session_id) {
Some(s) => s,
None => {
crate::session::inbox::push_inbox_message(inbox_msg);
return false;
}
};
let config_for_role = config.get_merged_config_for_role(&role);
let mut cancellation = crate::session::cancellation::SessionCancellation::new();
let op_rx = cancellation.new_operation();
if let Err(e) = chat_session.add_user_message(&inbox_msg.content) {
log_error!("WS monitor: failed to add inbox message: {}", e);
sessions
.lock()
.await
.insert(session_id.clone(), chat_session);
continue;
}
if let Err(e) =
prepare_for_api_call(&mut chat_session, &config_for_role, op_rx.clone())
.await
{
log_error!("WS monitor: failed to prepare API call: {}", e);
sessions
.lock()
.await
.insert(session_id.clone(), chat_session);
continue;
}
let (ws_tx, mut ws_rx) =
tokio::sync::mpsc::unbounded_channel::<ServerMessage>();
let ws_sink = WebSocketSink::new(ws_tx.clone());
crate::mcp::process::set_notification_sender(Some(session_id.clone()), ws_tx);
let bg_tx_fwd = bg_tx.clone();
let forward_task = tokio::spawn(async move {
while let Some(msg) = ws_rx.recv().await {
if bg_tx_fwd.send(msg).is_err() {
break; }
}
});
let result = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
&role,
op_rx,
OutputMode::WebSocket,
ws_sink,
)
.await;
crate::mcp::process::clear_notification_sender(Some(session_id.clone()));
let _ = forward_task.await;
if let Err(e) = result {
log_debug!("WS monitor: error processing inbox message: {}", e);
}
let total_tokens = chat_session.session.info.input_tokens
+ chat_session.session.info.output_tokens
+ chat_session.session.info.cache_read_tokens
+ chat_session.session.info.cache_write_tokens
+ chat_session.session.info.reasoning_tokens;
let _ = bg_tx.send(ServerMessage::Cost(CostPayload {
session_tokens: total_tokens,
session_cost: chat_session.session.info.total_cost,
input_tokens: chat_session.session.info.input_tokens,
output_tokens: chat_session.session.info.output_tokens,
cache_read_tokens: chat_session.session.info.cache_read_tokens,
cache_write_tokens: chat_session.session.info.cache_write_tokens,
reasoning_tokens: chat_session.session.info.reasoning_tokens,
session_id: session_id.clone(),
}));
if let Err(e) = chat_session.save() {
log_error!("WS monitor: failed to save session: {}", e);
}
sessions
.lock()
.await
.insert(session_id.clone(), chat_session);
}
false })
.await;
if should_exit || bg_tx.is_closed() {
break;
}
let inbox_gone = crate::session::context::with_session_id(session_id.clone(), async {
crate::session::inbox::get_inbox_notify().is_none()
})
.await;
if inbox_gone {
log_debug!("WS monitor: inbox cleared for {}, exiting", session_id);
break;
}
crate::session::context::with_session_id(session_id.clone(), async {
let inbox_notify = crate::session::inbox::get_inbox_notify();
tokio::select! {
_ = crate::mcp::core::next_schedule_sleep() => {}
_ = async {
if let Some(notify) = inbox_notify {
notify.notified().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
})
.await;
}
log_debug!(
"WebSocket: inbox monitor exited for session: {}",
session_id
);
});
}
async fn handle_session_message(
msg: SessionMessage,
ws_sender: &mut futures_util::stream::SplitSink<
WebSocketStream<TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
config: &Config,
role: &str,
sessions: &Arc<Mutex<HashMap<String, ChatSession>>>,
active_session_ids: &mut HashSet<String>,
bg_tx: &tokio::sync::mpsc::UnboundedSender<ServerMessage>,
) -> Result<()> {
log_debug!("Handling session message: session_id={:?}", msg.session_id);
let (mut chat_session, config_for_role, session_role, is_new) = match &msg.session_id {
Some(session_id) => {
let existing = sessions.lock().await.remove(session_id);
if let Some(session) = existing {
log_debug!("Resumed session from memory: {}", session_id);
let cfg = config.get_merged_config_for_role(role);
let role_name = role.to_string();
(session, cfg, role_name, false)
} else {
let args = if crate::session::get_sessions_dir()
.map(|d| d.join(format!("{}.jsonl", session_id)).exists())
.unwrap_or(false)
{
log_debug!("Resuming session from disk: {}", session_id);
GenericSessionArgs {
resume: Some(session_id.clone()),
role: role.to_string(),
mode: "websocket".into(),
..Default::default()
}
} else {
log_debug!("Creating named session: {}", session_id);
GenericSessionArgs {
name: Some(session_id.clone()),
role: role.to_string(),
mode: "websocket".into(),
..Default::default()
}
};
match setup_and_initialize_session(&args, config).await {
Ok((session, cfg, role_name, _, _)) => {
let is_new = !session.was_resumed;
(session, cfg, role_name, is_new)
}
Err(e) => {
let error =
ServerMessage::error(format!("Failed to initialize session: {}", e));
send_message(ws_sender, &error).await?;
return Ok(());
}
}
}
}
None => {
log_debug!("Creating new auto-named session with role: {}", role);
let args = GenericSessionArgs {
role: role.to_string(),
mode: "websocket".into(),
..Default::default()
};
match setup_and_initialize_session(&args, config).await {
Ok((session, cfg, role_name, _, _)) => (session, cfg, role_name, true),
Err(e) => {
let error = ServerMessage::error(format!("Failed to create session: {}", e));
send_message(ws_sender, &error).await?;
return Ok(());
}
}
}
};
let session_id = chat_session.session.info.name.clone();
active_session_ids.insert(session_id.clone());
let role_for_pool = session_role.clone();
crate::session::context::with_session_id(session_id.clone(), async {
crate::session::context::init_session_services(&role_for_pool);
crate::mcp::core::plan::core::restore_plan_for_session(&session_id);
crate::mcp::core::skill_auto::load_env_skills(&mut chat_session).await;
setup_system_prompt_and_cache(&mut chat_session, &config_for_role, &session_role, false)
.await?;
let status_msg = if is_new {
format!("Session created: {}", session_id)
} else {
format!("Session resumed: {}", session_id)
};
log_info!("{}", status_msg);
chat_session.save()?;
sessions
.lock()
.await
.insert(session_id.clone(), chat_session);
send_message(
ws_sender,
&ServerMessage::status(status_msg, Some(session_id.clone())),
)
.await?;
Ok::<(), anyhow::Error>(())
})
.await?;
spawn_ws_inbox_monitor(
session_id,
Arc::clone(sessions),
config.clone(),
role.to_string(),
bg_tx.clone(),
);
Ok(())
}
async fn lookup_session(
session_id: &str,
sessions: &Arc<Mutex<HashMap<String, ChatSession>>>,
config: &Config,
role: &str,
) -> std::result::Result<ChatSession, ServerMessage> {
let existing = sessions.lock().await.remove(session_id);
if let Some(session) = existing {
log_debug!("Resumed session from memory: {}", session_id);
return Ok(session);
}
log_debug!("Loading session from disk: {}", session_id);
let mut args = GenericSessionArgs::resume(session_id.to_string(), role.to_string());
args.mode = "websocket".to_string();
match setup_and_initialize_session(&args, config).await {
Ok((mut session, config_for_role, session_role, _, _)) => {
if let Err(e) =
setup_system_prompt_and_cache(&mut session, &config_for_role, &session_role, false)
.await
{
return Err(ServerMessage::error(format!(
"Failed to setup session {}: {}",
session_id, e
)));
}
log_info!("Session loaded from disk: {}", session_id);
Ok(session)
}
Err(_) => Err(ServerMessage::error(format!(
"Session not found: {}. Send a \"session\" message first to create or resume a session.",
session_id
))),
}
}
async fn handle_command_message(
msg: CommandMessage,
ws_sender: &mut futures_util::stream::SplitSink<
WebSocketStream<TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
config: &Config,
role: &str,
sessions: &Arc<Mutex<HashMap<String, ChatSession>>>,
) -> Result<()> {
let session_id = msg.session_id.as_str();
let command_name = msg.command.trim();
let args = msg.args.as_slice();
let slash_command = if args.is_empty() {
format!("/{}", command_name)
} else {
format!("/{} {}", command_name, args.join(" "))
};
log_debug!(
"Handling command message: session_id={}, command={}",
session_id,
slash_command
);
let mut chat_session = match lookup_session(session_id, sessions, config, role).await {
Ok(s) => s,
Err(error) => {
send_message(ws_sender, &error).await?;
return Ok(());
}
};
let session_id = session_id.to_string();
let config_for_role = config.get_merged_config_for_role(role);
let mut cancellation = SessionCancellation::new();
let operation_rx = cancellation.new_operation();
if command_name == "done" {
use crate::session::chat::session::commands::handle_done;
match handle_done(&mut chat_session, &config_for_role, operation_rx).await {
Ok(_) => {
let status = ServerMessage::status(
"Conversation compressed".to_string(),
Some(session_id.clone()),
);
send_message(ws_sender, &status).await?;
}
Err(e) => {
let error = ServerMessage::error(format!("Compression failed: {}", e));
send_message(ws_sender, &error).await?;
}
}
chat_session.save()?;
sessions.lock().await.insert(session_id, chat_session);
return Ok(());
}
use crate::session::chat::session::commands::CommandResult;
let command_result = chat_session
.process_command(
&slash_command,
&mut config_for_role.clone(),
role,
operation_rx,
)
.await?;
match command_result {
CommandResult::Handled => {
log_debug!("Command '{}' executed successfully", slash_command);
let status = ServerMessage::status(
format!("Command '{}' executed successfully", slash_command),
Some(session_id.clone()),
);
send_message(ws_sender, &status).await?;
}
CommandResult::HandledWithOutput(command_output) => {
log_debug!(
"Command '{}' executed with structured output",
slash_command
);
let response = ServerMessage::Status(StatusPayload {
message: format!("Command '{}' executed successfully", slash_command),
session_id: Some(session_id.clone()),
data: Some(command_output.to_json()),
});
send_message(ws_sender, &response).await?;
}
CommandResult::Exit => {
log_info!("Session ended by command '{}'", slash_command);
let status =
ServerMessage::status("Session ended".to_string(), Some(session_id.clone()));
send_message(ws_sender, &status).await?;
return Ok(());
}
CommandResult::TreatAsUserInput => {
let error = ServerMessage::error(format!(
"Unknown command: '{}'. Use type \"message\" to send user input.",
slash_command
));
send_message(ws_sender, &error).await?;
}
}
chat_session.save()?;
sessions.lock().await.insert(session_id, chat_session);
Ok(())
}
async fn handle_user_message(
msg: UserMessage,
ws_sender: &mut futures_util::stream::SplitSink<
WebSocketStream<TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
config: &Config,
role: &str,
sessions: &Arc<Mutex<HashMap<String, ChatSession>>>,
) -> Result<()> {
let session_id = msg.session_id.as_str();
let input = msg.content.clone();
log_debug!(
"Handling user message: session_id={}, content_len={}",
session_id,
input.len()
);
let mut chat_session = match lookup_session(session_id, sessions, config, role).await {
Ok(s) => s,
Err(error) => {
send_message(ws_sender, &error).await?;
return Ok(());
}
};
let session_id = session_id.to_string();
let current_dir = crate::mcp::get_thread_working_directory();
let config_for_role = config.get_merged_config_for_role(role);
let mut cancellation = SessionCancellation::new();
let operation_rx = cancellation.new_operation();
{
crate::mcp::core::flush_due_to_inbox();
while let Some(inbox_msg) = crate::session::inbox::try_pop_inbox_message() {
log_debug!(
"WebSocket pre-user: processing inbox message from {:?}",
inbox_msg.source
);
chat_session.add_user_message(&inbox_msg.content)?;
let op_rx = cancellation.new_operation();
prepare_for_api_call(&mut chat_session, &config_for_role, op_rx.clone()).await?;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let sink = WebSocketSink::new(tx);
let result = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
role,
op_rx,
OutputMode::WebSocket,
sink,
)
.await;
while let Ok(msg) = rx.try_recv() {
send_message(ws_sender, &msg).await?;
}
if let Err(e) = result {
log_debug!("Error processing pre-user inbox message (websocket): {}", e);
}
}
}
let first_message_processed = !chat_session.session.messages.is_empty();
log_debug!(
"Processing input through layers: first_message={}",
!first_message_processed
);
let (processed_input, layers_modified_session, _layer_cancelled) = process_layers_if_enabled(
&input,
&mut chat_session,
&config_for_role,
role,
first_message_processed,
operation_rx.clone(),
)
.await?;
if !layers_modified_session && chat_session.first_prompt_idx.is_none() {
chat_session.first_prompt_idx = Some(chat_session.session.messages.len());
}
let _compression_occurred =
match crate::session::chat::conversation_compression::check_and_compress_conversation(
&mut chat_session,
&config_for_role,
operation_rx.clone(),
false,
)
.await
{
Ok(compressed) => compressed,
Err(e) => {
log_debug!(
"Conversation compression failed: {}. Continuing session.",
e
);
false
}
};
if !layers_modified_session {
let final_input_with_constraints =
crate::session::chat::session::utils::append_constraints_if_exists(
&processed_input,
&config_for_role.custom_constraints_file_name,
¤t_dir,
);
chat_session.add_user_message(&final_input_with_constraints)?;
}
prepare_for_api_call(&mut chat_session, &config_for_role, operation_rx.clone()).await?;
let (ws_tx, mut ws_rx) = tokio::sync::mpsc::unbounded_channel();
let ws_sink = WebSocketSink::new(ws_tx.clone());
crate::mcp::process::set_notification_sender(Some(session_id.clone()), ws_tx);
let api_result = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
role,
operation_rx.clone(),
OutputMode::WebSocket,
ws_sink,
)
.await;
while let Ok(msg) = ws_rx.try_recv() {
send_message(ws_sender, &msg).await?;
}
match api_result {
Ok(_) => {
let total_tokens = chat_session.session.info.input_tokens
+ chat_session.session.info.output_tokens
+ chat_session.session.info.cache_read_tokens
+ chat_session.session.info.cache_write_tokens
+ chat_session.session.info.reasoning_tokens;
let cost_msg = ServerMessage::Cost(CostPayload {
session_tokens: total_tokens,
session_cost: chat_session.session.info.total_cost,
input_tokens: chat_session.session.info.input_tokens,
output_tokens: chat_session.session.info.output_tokens,
cache_read_tokens: chat_session.session.info.cache_read_tokens,
cache_write_tokens: chat_session.session.info.cache_write_tokens,
reasoning_tokens: chat_session.session.info.reasoning_tokens,
session_id: session_id.clone(),
});
send_message(ws_sender, &cost_msg).await?;
}
Err(e) => {
log_error!("API call failed: {}", e);
let error = ServerMessage::error(format!("Error: {}", e));
send_message(ws_sender, &error).await?;
}
}
log_debug!("Saving session: {}", session_id);
chat_session.save()?;
crate::mcp::process::clear_notification_sender(Some(session_id.clone()));
sessions
.lock()
.await
.insert(session_id.clone(), chat_session);
if let Some(notify) = crate::session::inbox::get_inbox_notify() {
notify.notify_one();
}
log_debug!("Session stored back in memory: {}", session_id);
Ok(())
}
async fn send_message(
ws_sender: &mut futures_util::stream::SplitSink<
WebSocketStream<TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
msg: &ServerMessage,
) -> Result<()> {
let json = serde_json::to_string(msg)?;
log_debug!(
"Sending message: type={:?}, size={} bytes",
std::mem::discriminant(msg),
json.len()
);
ws_sender.send(Message::text(json)).await?;
Ok(())
}