use crate::channels::{Channel, ChannelConnection};
use crate::error::{Result, SpiceError};
use crate::protocol::*;
use crate::utils::sleep;
use binrw::BinRead;
use instant::{Duration, Instant};
use tracing::{debug, error, info, warn};
pub struct MainChannel {
connection: ChannelConnection,
session_id: Option<u32>,
}
impl MainChannel {
pub async fn new(host: &str, port: u16) -> Result<Self> {
let mut connection = ChannelConnection::new(host, port, ChannelType::Main, 0).await?;
connection.handshake().await?;
Ok(Self {
connection,
session_id: None,
})
}
#[cfg(target_arch = "wasm32")]
pub async fn new_websocket(websocket_url: &str) -> Result<Self> {
Self::new_websocket_with_auth(websocket_url, None).await
}
#[cfg(target_arch = "wasm32")]
pub async fn new_websocket_with_auth(
websocket_url: &str,
auth_token: Option<String>,
) -> Result<Self> {
let mut connection = ChannelConnection::new_websocket_with_auth(
websocket_url,
ChannelType::Main,
0,
auth_token,
)
.await?;
connection.handshake().await?;
Ok(Self {
connection,
session_id: None,
})
}
#[cfg(target_arch = "wasm32")]
pub async fn new_websocket_with_password(
websocket_url: &str,
auth_token: Option<String>,
password: Option<String>,
) -> Result<Self> {
let mut connection = ChannelConnection::new_websocket_with_auth(
websocket_url,
ChannelType::Main,
0,
auth_token,
)
.await?;
if let Some(password) = password {
connection.set_password(password);
}
connection.handshake().await?;
Ok(Self {
connection,
session_id: None,
})
}
pub fn get_session_id(&self) -> Option<u32> {
self.session_id
}
pub async fn send_attach_channels(&mut self) -> Result<()> {
self.connection
.send_message(crate::protocol::SPICE_MSGC_MAIN_ATTACH_CHANNELS, &[])
.await?;
info!("Sent ATTACH_CHANNELS message");
Ok(())
}
pub async fn initialize(&mut self) -> Result<()> {
info!("Trying to receive server init message or proceeding with client-initiated flow");
#[cfg(target_arch = "wasm32")]
{
use gloo_timers::future::TimeoutFuture;
let read_future = self.connection.read_message();
let timeout_future = TimeoutFuture::new(2000);
match tokio::select! {
result = read_future => Some(result),
_ = timeout_future => {
info!("No server init message received, trying client-initiated flow");
None
}
} {
Some(Ok((header, data))) => {
info!(
"Received server message: type={}, size={}",
header.msg_type, header.msg_size
);
if header.msg_type == MainChannelMessage::Init as u16 {
info!("Received SpiceMsgMainInit from server");
self.handle_message(&header, &data).await?;
} else {
info!(
"Unexpected first message type: {}, handling anyway",
header.msg_type
);
self.handle_message(&header, &data).await?;
}
}
Some(Err(e)) => {
info!(
"Error waiting for server init: {}, trying client-initiated flow",
e
);
self.try_client_initiated_flow().await?;
}
None => {
self.try_client_initiated_flow().await?;
}
}
}
#[cfg(not(target_arch = "wasm32"))]
{
match tokio::time::timeout(
std::time::Duration::from_millis(2000),
self.connection.read_message(),
)
.await
{
Ok(Ok((header, data))) => {
info!(
"Received server message: type={}, size={}",
header.msg_type, header.msg_size
);
if header.msg_type == MainChannelMessage::Init as u16 {
info!("Received SpiceMsgMainInit from server");
self.handle_message(&header, &data).await?;
} else {
info!(
"Unexpected first message type: {}, handling anyway",
header.msg_type
);
self.handle_message(&header, &data).await?;
}
}
Ok(Err(e)) => {
info!(
"Error waiting for server init: {}, trying client-initiated flow",
e
);
self.try_client_initiated_flow().await?;
}
Err(_) => {
info!("No server init message received, trying client-initiated flow");
self.try_client_initiated_flow().await?;
}
}
}
info!("SPICE main channel ready");
Ok(())
}
async fn try_client_initiated_flow(&mut self) -> Result<()> {
info!("Waiting for server to send SPICE_MSG_MAIN_INIT");
let start_time = Instant::now();
let timeout_duration = Duration::from_secs(5);
while start_time.elapsed() < timeout_duration {
match self.connection.read_message().await {
Ok((header, data)) => {
info!(
"Received message while waiting for init: type={}, size={}",
header.msg_type, header.msg_size
);
self.handle_message(&header, &data).await?;
if header.msg_type == MainChannelMessage::Init as u16 {
info!("Successfully received SPICE_MSG_MAIN_INIT");
return Ok(());
}
}
Err(e) => {
if !e.to_string().contains("timeout") {
debug!("Error while waiting for init: {}", e);
}
sleep(Duration::from_millis(50)).await;
}
}
}
warn!("Timeout waiting for SPICE_MSG_MAIN_INIT, proceeding anyway");
Ok(())
}
pub async fn get_channels_list(&mut self) -> Result<Vec<(ChannelType, u8)>> {
info!("Waiting for server to send SPICE_MSG_MAIN_CHANNELS_LIST");
let start_time = Instant::now();
let timeout = Duration::from_secs(2);
while start_time.elapsed() < timeout {
match self.connection.read_message().await {
Ok((header, data)) => {
if header.msg_type == MainChannelMessage::ChannelsList as u16 {
info!("Received SPICE_MSG_MAIN_CHANNELS_LIST");
let mut channels = Vec::new();
if data.len() >= 4 {
let num_channels =
u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
info!("Server reports {} channels", num_channels);
let mut offset = 4;
for i in 0..num_channels {
if offset + 1 < data.len() {
let channel_type = data[offset];
let channel_id = data[offset + 1];
let ch_type = ChannelType::from(channel_type);
channels.push((ch_type, channel_id));
info!("Channel {}: type={:?}, id={}", i, ch_type, channel_id);
offset += 2;
}
}
}
return Ok(channels);
} else {
self.handle_message(&header, &data).await?;
}
}
Err(e) => {
if !e.to_string().contains("timeout") {
warn!("Error while waiting for channels list: {}", e);
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
}
warn!("Timeout waiting for SPICE_MSG_MAIN_CHANNELS_LIST, using defaults");
let channels = vec![
(ChannelType::Display, 0),
(ChannelType::Inputs, 0),
(ChannelType::Cursor, 0),
];
Ok(channels)
}
pub async fn run(&mut self) -> Result<()> {
loop {
let (header, data) = self.connection.read_message().await?;
self.handle_message(&header, &data).await?;
}
}
}
impl Channel for MainChannel {
async fn handle_message(&mut self, header: &SpiceDataHeader, data: &[u8]) -> Result<()> {
debug!(
"Main channel message: serial={}, type={}, size={}, sub_list={}",
header.serial, header.msg_type, header.msg_size, header.sub_list
);
if header.msg_type < 100 {
match header.msg_type {
x if x == crate::protocol::SPICE_MSG_PING => {
debug!(
"Received SPICE_MSG_PING with {} bytes, sending PONG",
data.len()
);
if data.len() >= 4 {
let id = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let time = if data.len() >= 12 {
u64::from_le_bytes([
data[4], data[5], data[6], data[7], data[8], data[9], data[10],
data[11],
])
} else {
0
};
debug!("PING id: {}, time: {}", id, time);
let mut pong_data = Vec::with_capacity(12);
pong_data.extend_from_slice(&id.to_le_bytes());
pong_data.extend_from_slice(&time.to_le_bytes());
self.connection
.send_message(crate::protocol::SPICE_MSGC_PONG, &pong_data)
.await?;
debug!("Sent PONG response (12 bytes)");
} else {
warn!("PING message too small ({} bytes), ignoring", data.len());
}
return Ok(());
}
x if x == crate::protocol::SPICE_MSG_SET_ACK => {
debug!("Received SPICE_MSG_SET_ACK");
return Ok(());
}
x if x == crate::protocol::SPICE_MSG_NOTIFY => {
debug!("Received SPICE_MSG_NOTIFY");
return Ok(());
}
x if x == crate::protocol::SPICE_MSG_DISCONNECTING => {
info!("Server sent SPICE_MSG_DISCONNECTING");
return Err(SpiceError::ConnectionClosed);
}
_ => {
warn!("Unknown common message type: {}", header.msg_type);
return Ok(());
}
}
}
match header.msg_type {
x if x == MainChannelMessage::Init as u16 => {
info!(
"Raw SPICE_MSG_MAIN_INIT data ({} bytes): {:?}",
data.len(),
data
);
if data.len() >= 32 {
let field1 = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let field2 = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
let field3 = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
let field4 = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
info!(
"Manual parse - Field 1 (session_id?): {} (0x{:08x})",
field1, field1
);
info!("Manual parse - Field 2: {} (0x{:08x})", field2, field2);
info!("Manual parse - Field 3: {} (0x{:08x})", field3, field3);
info!("Manual parse - Field 4: {} (0x{:08x})", field4, field4);
}
let mut cursor = std::io::Cursor::new(data);
let init_msg = crate::protocol::SpiceMsgMainInit::read(&mut cursor)
.map_err(|e| SpiceError::Protocol(format!("Failed to parse Init: {e}")))?;
let session_id_bytes = init_msg.session_id.to_le_bytes();
info!(
"Parsed SPICE_MSG_MAIN_INIT: session_id={} (0x{:08x}, bytes={:?})",
init_msg.session_id, init_msg.session_id, session_id_bytes
);
info!(
" display_hint={}, mouse_modes={:x}",
init_msg.display_channels_hint, init_msg.supported_mouse_modes
);
info!(
" Server mouse mode: {}, agent_connected: {}",
init_msg.current_mouse_mode, init_msg.agent_connected
);
self.session_id = Some(init_msg.session_id);
debug!("Sending ATTACH_CHANNELS message");
self.send_attach_channels().await?;
}
x if x == MainChannelMessage::ChannelsList as u16 => {
debug!("Received channels list");
}
x if x == MainChannelMessage::MouseMode as u16 => {
let mut cursor = std::io::Cursor::new(data);
let mouse_mode = SpiceMsgMainMouseMode::read(&mut cursor)
.map_err(|e| SpiceError::Protocol(format!("Failed to parse MouseMode: {e}")))?;
info!("Mouse mode changed to: {}", mouse_mode.mode);
}
x if x == MainChannelMessage::MultiMediaTime as u16 => {
let mut cursor = std::io::Cursor::new(data);
let mm_time = SpiceMsgMainMultiMediaTime::read(&mut cursor).map_err(|e| {
SpiceError::Protocol(format!("Failed to parse MultiMediaTime: {e}"))
})?;
debug!("Multimedia time: {}", mm_time.time);
}
x if x == MainChannelMessage::AgentConnected as u16 => {
let mut cursor = std::io::Cursor::new(data);
let agent_connected =
SpiceMsgMainAgentConnected::read(&mut cursor).map_err(|e| {
SpiceError::Protocol(format!("Failed to parse AgentConnected: {e}"))
})?;
info!(
"Agent connected with error code: {}",
agent_connected.error_code
);
}
x if x == MainChannelMessage::AgentDisconnected as u16 => {
info!("Agent disconnected");
}
x if x == MainChannelMessage::AgentData as u16 => {
let mut cursor = std::io::Cursor::new(data);
let agent_data = SpiceMsgMainAgentData::read(&mut cursor)
.map_err(|e| SpiceError::Protocol(format!("Failed to parse AgentData: {e}")))?;
debug!(
"Received agent data: protocol {}, type {}, size {}",
agent_data.protocol, agent_data.type_, agent_data.size
);
}
x if x == MainChannelMessage::AgentToken as u16 => {
let mut cursor = std::io::Cursor::new(data);
let agent_tokens = SpiceMsgMainAgentTokens::read(&mut cursor).map_err(|e| {
SpiceError::Protocol(format!("Failed to parse AgentTokens: {e}"))
})?;
debug!("Agent tokens: {}", agent_tokens.num_tokens);
}
x if x == SPICE_MSG_NOTIFY => {
let mut cursor = std::io::Cursor::new(data);
let notify = SpiceMsgMainNotify::read(&mut cursor)
.map_err(|e| SpiceError::Protocol(format!("Failed to parse Notify: {e}")))?;
let message = String::from_utf8_lossy(¬ify.message);
match notify.severity {
0 => info!("Server info: {}", message),
1 => warn!("Server warning: {}", message),
2 => error!("Server error: {}", message),
_ => debug!(
"Server notification (severity {}): {}",
notify.severity, message
),
}
}
x if x == SPICE_MSG_DISCONNECTING => {
info!("Server is disconnecting");
return Err(SpiceError::ConnectionClosed);
}
x if x == MainChannelMessage::Name as u16 => {
debug!("Received VM name");
if data.len() >= 4 {
let name_len =
u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
if data.len() >= 4 + name_len {
let name = String::from_utf8_lossy(&data[4..4 + name_len]);
info!("VM name: {}", name);
}
}
}
x if x == MainChannelMessage::Uuid as u16 => {
debug!("Received VM UUID");
if data.len() >= 16 {
let uuid_bytes: [u8; 16] = data[0..16].try_into().unwrap_or_default();
info!("VM UUID: {:02x?}", uuid_bytes);
}
}
x if x == MainChannelMessage::MigrateBegin as u16 => {
warn!("Migration begin not implemented");
}
x if x == MainChannelMessage::MigrateCancel as u16 => {
warn!("Migration cancel not implemented");
}
x if x == MainChannelMessage::MigrateSwitchHost as u16 => {
warn!("Migration switch host not implemented");
}
x if x == MainChannelMessage::MigrateEnd as u16 => {
warn!("Migration end not implemented");
}
x if x == MainChannelMessage::AgentConnectedTokens as u16 => {
warn!("Agent connected tokens not implemented");
}
x if x == MainChannelMessage::MigrateBeginSeamless as u16 => {
warn!("Seamless migration begin not implemented");
}
x if x == MainChannelMessage::MigrateDstSeamlessAck as u16 => {
warn!("Seamless migration dst ack not implemented");
}
x if x == MainChannelMessage::MigrateDstSeamlessNack as u16 => {
warn!("Seamless migration dst nack not implemented");
}
x if x == SPICE_MSG_MIGRATE => {
warn!("Migration message not implemented");
}
x if x == SPICE_MSG_MIGRATE_DATA => {
warn!("Migration data message not implemented");
}
x if x == SPICE_MSG_WAIT_FOR_CHANNELS => {
info!("Received SPICE_MSG_WAIT_FOR_CHANNELS");
let mut cursor = std::io::Cursor::new(data);
let wait_msg = crate::protocol::SpiceMsgWaitForChannels::read(&mut cursor)
.map_err(|e| {
SpiceError::Protocol(format!("Failed to parse WaitForChannels: {e}"))
})?;
info!(
"Server requesting synchronization for {} channels",
wait_msg.wait_count
);
for (idx, channel_id) in wait_msg.wait_list.iter().enumerate() {
info!(
" Channel {}: type={}, id={}",
idx, channel_id.type_, channel_id.id
);
}
warn!("Full wait-for-channels synchronization not yet implemented");
warn!("Channels will continue without explicit synchronization");
}
_ => {
warn!("Unknown message type: {}", header.msg_type);
}
}
Ok(())
}
fn channel_type(&self) -> ChannelType {
ChannelType::Main
}
}