//! WebSocket streaming tests for real-time bidirectional communication
//!
//! This test suite validates the WebSocket streaming functionality including:
//! - WebSocket connection establishment and management
//! - Real-time bidirectional message streaming
//! - Connection persistence and automatic reconnection
//! - Message queuing for reliable delivery
//! - Graceful fallback to HTTP streaming when WebSocket unavailable
//! - Connection pooling for multiple concurrent streams
//! - Authentication and authorization over WebSocket
// Additional features:
// - Compression support for efficient data transfer
// - Heartbeat/keepalive mechanism
// - Connection state monitoring and events
// - Error handling and connection recovery
#![ allow( clippy::std_instead_of_core ) ] // std required for async operations and sync primitives
#[ cfg( feature = "websocket_streaming" ) ] // Feature gate - not implemented yet
use api_ollama::*;
#[ cfg( feature = "websocket_streaming" ) ]
use std::time::Duration;
#[ cfg( feature = "websocket_streaming" ) ]
use std::sync::Arc;
#[ cfg( feature = "websocket_streaming" ) ]
use tokio::sync::Mutex;
#[ cfg( feature = "websocket_streaming" ) ]
mod integration_tests
{
use super::*;
#[ tokio::test ]
async fn test_websocket_connection_establishment() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_timeout( Duration::from_secs( 5 ) );
let websocket_client = WebSocketClient::new( config )?;
// Test connection establishment
let connection_result = websocket_client.connect().await;
assert!( connection_result.is_ok() );
let connection = connection_result.unwrap();
assert_eq!( connection.state(), WebSocketState::Connected );
assert!( connection.is_connected() );
// Test graceful disconnect
connection.disconnect().await?;
assert_eq!( connection.state(), WebSocketState::Disconnected );
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_message_streaming() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_compression( true );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Create a chat request for streaming
let chat_request = ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: "Hello, how are you?".to_string(),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
// Start streaming
let mut stream = connection.stream_chat( chat_request ).await?;
let mut message_count = 0;
// Collect streaming responses
while let Some( result ) = stream.next().await
{
match result
{
Ok( response ) =>
{
message_count += 1;
assert!( !response.message.content.is_empty() );
// Stop after a few messages for testing
if message_count >= 3
{
break;
}
},
Err( e ) => return Err( e.into() ),
}
}
assert!( message_count > 0 );
connection.disconnect().await?;
Ok( () )
}
// DISABLED: 2025-11-07 by Development Team
// REASON: Integration test requires WebSocket infrastructure not available in CI
// RE-ENABLE: When WebSocket infrastructure is available for CI testing
// APPROVED: Team Lead
// TRACKING: issue-test-websocket-disabled
#[ tokio::test ]
#[ ignore = "Integration test disabled - requires stable server infrastructure" ]
async fn test_websocket_connection_persistence() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_heartbeat_interval( Duration::from_secs( 2 ) )
.with_keepalive( true );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Wait for heartbeat cycles
tokio::time::sleep( Duration::from_secs( 5 ) ).await;
// Connection should still be alive
assert!( connection.is_connected() );
assert_eq!( connection.state(), WebSocketState::Connected );
// Check heartbeat metrics
let metrics = connection.get_metrics();
assert!( metrics.heartbeat_count > 0 );
assert!( metrics.uptime > Duration::from_secs( 4 ) );
connection.disconnect().await?;
Ok( () )
}
// DISABLED: 2025-11-07 by Development Team
// REASON: Requires external WebSocket infrastructure not available in CI
// RE-ENABLE: When WebSocket infrastructure is available for CI testing
// APPROVED: Team Lead
// TRACKING: issue-test-websocket-disabled
#[ tokio::test ]
#[ ignore = "Integration test disabled - requires stable server infrastructure" ]
async fn test_websocket_automatic_reconnection() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_auto_reconnect( true )
.with_reconnect_interval( Duration::from_millis( 500 ) )
.with_max_reconnection_attempts( 3 );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Simulate connection drop
connection.simulate_connection_drop().await?;
assert_eq!( connection.state(), WebSocketState::Reconnecting );
// Wait for automatic reconnection
tokio::time::sleep( Duration::from_secs( 2 ) ).await;
// Should be reconnected
assert_eq!( connection.state(), WebSocketState::Connected );
assert!( connection.is_connected() );
let metrics = connection.get_metrics();
assert!( metrics.reconnect_count > 0 );
connection.disconnect().await?;
Ok( () )
}
// DISABLED: 2025-11-07 by Development Team
// REASON: Integration test requires WebSocket infrastructure not available in CI
// RE-ENABLE: When WebSocket infrastructure is available for CI testing
// APPROVED: Team Lead
// TRACKING: issue-test-websocket-disabled
#[ tokio::test ]
#[ ignore = "Integration test disabled - requires stable server infrastructure" ]
async fn test_websocket_message_queuing() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_max_queue_size( 100 );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Create multiple requests while connection is available
let requests = vec![
ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: "Message 1".to_string(),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
},
ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: "Message 2".to_string(),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
},
];
// Queue messages
for request in requests
{
connection.queue_message( request ).await?;
}
// Check queue status
let queue_info = connection.get_queue_info().await;
assert!( queue_info.pending_messages > 0 );
assert!( queue_info.size <= 100 );
// Process queued messages
connection.process_queue().await?;
connection.disconnect().await?;
Ok( () )
}
// DISABLED: 2025-11-07 by Development Team
// REASON: Integration test requires WebSocket infrastructure not available in CI
// RE-ENABLE: When WebSocket infrastructure is available for CI testing
// APPROVED: Team Lead
// TRACKING: issue-test-websocket-disabled
#[ tokio::test ]
#[ ignore = "Integration test disabled - requires stable server infrastructure" ]
async fn test_websocket_fallback_to_http() -> Result< (), Box< dyn std::error::Error > >
{
// Try to connect to an invalid WebSocket endpoint
let config = WebSocketConfig::new()
.with_url( "ws://invalid-endpoint:11434/api/ws" )
.with_fallback_to_http( true )
.with_http_fallback( Some( "http://localhost:11434" ) );
let websocket_client = WebSocketClient::new( config )?;
// Should fallback to HTTP when WebSocket fails
let connection_result = websocket_client.connect_or_fallback().await;
assert!( connection_result.is_ok() );
let connection = connection_result.unwrap();
// Should be using HTTP fallback
assert_eq!( connection.connection_type(), ConnectionType::HttpFallback );
assert!( connection.is_connected() );
// HTTP streaming should still work
let chat_request = ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: "Test fallback".to_string(),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
let stream_result = connection.stream_chat( chat_request ).await;
assert!( stream_result.is_ok() );
connection.disconnect().await?;
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_connection_pooling() -> Result< (), Box< dyn std::error::Error > >
{
let pool_config = WebSocketPoolConfig::new()
.with_max_connections( 5 )
.with_connection_timeout( Duration::from_secs( 10 ) );
let pool = WebSocketPool::new( pool_config );
// Create multiple connections
let mut connections = Vec::new();
for i in 0..3
{
let config = WebSocketConfig::new()
.with_url( format!( "ws://localhost:11434/api/ws/conn_{i}" ) );
let connection = pool.get_or_create_connection( config ).await?;
assert!( connection.is_connected() );
connections.push( connection );
}
// Check pool status
let pool_stats = pool.get_statistics().await;
assert_eq!( pool_stats.active_connections, 3 );
assert!( pool_stats.total_connections <= 5 );
// Test connection reuse
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws/conn_0" );
let reused_connection = pool.get_or_create_connection( config ).await?;
assert!( reused_connection.is_connected() );
// Pool should still have 3 connections (reused existing)
let pool_stats = pool.get_statistics().await;
assert_eq!( pool_stats.active_connections, 3 );
// Close all connections
for connection in connections
{
connection.disconnect().await?;
}
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_authentication() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_auth_token( Some( "test-auth-token".to_string() ) )
.with_auth_method( WebSocketAuthMethod::BearerToken( "test-token".to_string() ) );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Connection should be authenticated
assert!( connection.is_authenticated() );
assert_eq!( connection.auth_status(), AuthStatus::Authenticated );
// Test authenticated streaming
let chat_request = ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: "Authenticated message".to_string(),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
let stream_result = connection.stream_chat( chat_request ).await;
assert!( stream_result.is_ok() );
connection.disconnect().await?;
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_compression() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_compression( true )
.with_compression_level( 6 );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Test that compression is enabled
assert!( connection.is_compression_enabled() );
// Send a large message to test compression
let large_content = "x".repeat( 1000 );
let chat_request = ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: large_content,
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
let stream_result = connection.stream_chat( chat_request ).await;
assert!( stream_result.is_ok() );
// Check compression metrics
let metrics = connection.get_metrics();
assert!( metrics.bytes_sent > 0 );
assert!( metrics.bytes_received > 0 );
assert!( metrics.compression_ratio > 0.0 );
connection.disconnect().await?;
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_concurrent_operations() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_max_concurrent_streams( 5 );
let websocket_client = WebSocketClient::new( config )?;
let connection = Arc::new( websocket_client.connect().await? );
let mut handles = Vec::new();
// Start multiple concurrent streaming operations
for i in 0..3
{
let connection_clone = connection.clone();
let handle = tokio::spawn( async move
{
let chat_request = ChatRequest
{
model: "llama2".to_string(),
messages: vec![ ChatMessage
{
role: MessageRole::User,
content: format!( "Concurrent message {i}" ),
images: None,
#[ cfg( feature = "tool_calling" ) ]
tool_calls: None,
} ],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
let mut stream = connection_clone.stream_chat( chat_request ).await?;
let mut message_count = 0;
while let Some( result ) = stream.next().await
{
match result
{
Ok( _ ) =>
{
message_count += 1;
if message_count >= 2
{
break;
}
},
Err( e ) => return Err( e ),
}
}
Ok( message_count )
} );
handles.push( handle );
}
// Wait for all concurrent operations to complete
for handle in handles
{
let result = handle.await?;
assert!( result.is_ok() );
assert!( result.unwrap() > 0 );
}
connection.disconnect().await?;
Ok( () )
}
#[ tokio::test ]
async fn test_websocket_error_handling() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_error_handling( WebSocketErrorHandling::Resilient );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
// Test invalid message handling
let invalid_request = ChatRequest
{
model: String::new(), // Invalid empty model
messages: vec![],
stream: Some( true ),
options: None,
#[ cfg( feature = "tool_calling" ) ]
tools: None,
#[ cfg( feature = "tool_calling" ) ]
tool_messages: None,
};
let stream_result = connection.stream_chat( invalid_request ).await;
assert!( stream_result.is_err() );
// Connection should still be usable after error
assert!( connection.is_connected() );
// Test network error simulation
connection.simulate_network_error().await?;
// Should attempt recovery
tokio::time::sleep( Duration::from_millis( 500 ) ).await;
let recovery_status = connection.get_recovery_status();
assert!( recovery_status.error_count > 0 );
assert!( recovery_status.recovery_attempts > 0 );
connection.disconnect().await?;
Ok( () )
}
// DISABLED: 2025-11-07 by Development Team
// REASON: Integration test requires WebSocket infrastructure not available in CI
// RE-ENABLE: When WebSocket infrastructure is available for CI testing
// APPROVED: Team Lead
// TRACKING: issue-test-websocket-disabled
#[ tokio::test ]
#[ ignore = "Integration test disabled - requires stable server infrastructure" ]
async fn test_websocket_state_monitoring() -> Result< (), Box< dyn std::error::Error > >
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" );
let websocket_client = WebSocketClient::new( config )?;
let connection = websocket_client.connect().await?;
let state_changes = Arc::new( Mutex::new( Vec::new() ) );
let state_changes_clone = state_changes.clone();
// Subscribe to state change events
let _ = connection.on_state_change( move | new_state |
{
let state_changes = state_changes_clone.clone();
tokio::spawn( async move
{
state_changes.lock().await.push( ( WebSocketState::Disconnected, new_state ) );
} );
} );
// Trigger state changes
connection.disconnect().await?;
let reconnect_result = connection.reconnect().await;
assert!( reconnect_result.is_ok() );
// Wait for state change events
tokio::time::sleep( Duration::from_millis( 100 ) ).await;
let recorded_changes = state_changes.lock().await;
assert!( !recorded_changes.is_empty() );
// Should have recorded disconnect and reconnect events
assert!( recorded_changes.iter().any( | ( _, new_state ) | *new_state == WebSocketState::Disconnected ) );
assert!( recorded_changes.iter().any( | ( _, new_state ) | *new_state == WebSocketState::Connected ) );
connection.disconnect().await?;
Ok( () )
}
}
#[ cfg( feature = "websocket_streaming" ) ]
mod unit_tests
{
use super::*;
#[ test ]
fn test_websocket_config_creation()
{
let config = WebSocketConfig::new()
.with_url( "ws://localhost:11434/api/ws" )
.with_timeout( Duration::from_secs( 10 ) )
.with_compression( true )
.with_auto_reconnect( true );
assert_eq!( config.url(), "ws://localhost:11434/api/ws" );
assert_eq!( config.timeout(), Duration::from_secs( 10 ) );
assert!( config.compression_enabled() );
assert!( config.auto_reconnect_enabled() );
}
#[ test ]
fn test_websocket_state_enum()
{
let states = vec![
WebSocketState::Disconnected,
WebSocketState::Connecting,
WebSocketState::Connected,
WebSocketState::Reconnecting,
WebSocketState::Error,
];
for state in &states
{
// Test Debug and Display implementations
assert!( !format!( "{state:?}" ).is_empty() );
assert!( !format!( "{state}" ).is_empty() );
}
// Test state transitions
assert_ne!( WebSocketState::Disconnected, WebSocketState::Connected );
assert_ne!( WebSocketState::Connecting, WebSocketState::Reconnecting );
}
#[ test ]
fn test_websocket_auth_methods()
{
let auth_methods = vec![
WebSocketAuthMethod::None,
WebSocketAuthMethod::BearerToken( "test-token".to_string() ),
WebSocketAuthMethod::ApiKey( "api-key-123".to_string() ),
WebSocketAuthMethod::Custom( "custom-auth".to_string() ),
];
for method in &auth_methods
{
assert!( !format!( "{method:?}" ).is_empty() );
}
}
#[ test ]
fn test_websocket_error_types()
{
let connection_error = WebSocketError::ConnectionFailed( "Connection refused".to_string() );
let auth_error = WebSocketError::AuthenticationFailed( "Bearer token failed".to_string() );
let stream_error = WebSocketError::StreamingError {
message: "Stream interrupted".to_string(),
code: Some( 1000 ),
};
// Test error formatting
assert!( format!( "{connection_error}" ).contains( "refused" ) );
assert!( format!( "{auth_error}" ).contains( "Authentication" ) );
assert!( format!( "{stream_error}" ).contains( "Stream" ) );
}
#[ test ]
fn test_websocket_metrics_calculation()
{
let mut metrics = WebSocketMetrics::new();
metrics.record_message_sent( 100 );
metrics.record_message_received( 150 );
metrics.record_heartbeat();
metrics.record_reconnect();
assert_eq!( metrics.bytes_sent, 100 );
assert_eq!( metrics.bytes_received, 150 );
assert_eq!( metrics.heartbeat_count, 1 );
assert_eq!( metrics.reconnect_count, 1 );
assert!( metrics.uptime > Duration::from_nanos( 0 ) );
}
#[ test ]
fn test_connection_pool_configuration()
{
let pool_config = WebSocketPoolConfig::new()
.with_max_connections( 10 )
.with_connection_timeout( Duration::from_secs( 30 ) )
.with_idle_timeout( Duration::from_secs( 300 ) );
assert_eq!( pool_config.max_connections(), 10 );
assert_eq!( pool_config.connection_timeout(), Duration::from_secs( 30 ) );
assert_eq!( pool_config.idle_timeout(), Duration::from_secs( 300 ) );
}
#[ test ]
fn test_message_queue_operations()
{
let queue = MessageQueue::new( 5 );
assert_eq!( queue.capacity(), 5 );
assert_eq!( queue.len(), 0 );
assert!( queue.is_empty() );
// Add messages to queue
for i in 0..3
{
let message = WebSocketMessage::Text( format!( "Message {i}" ) );
queue.push( &message ).expect( "Should be able to push message" );
}
assert_eq!( queue.len(), 3 );
assert!( !queue.is_empty() );
// Pop messages from queue
let message = queue.pop();
assert!( message.is_some() );
assert_eq!( queue.len(), 2 );
// Test queue overflow
for i in 0..10
{
let message = WebSocketMessage::Text( format!( "Overflow {i}" ) );
let result = queue.push( &message );
if i >= 3
{
assert!( result.is_err() ); // Should fail when queue is full
}
}
}
}