mod common;
use bytes::Bytes;
use common::setup_test_server;
use futures::{SinkExt, StreamExt};
use serde_json::Value;
use std::time::Duration;
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::test]
async fn test_websocket_connection() {
let (_client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream", server.addr);
let (ws_stream, _response) = connect_async(&ws_url)
.await
.expect("Failed to connect to WebSocket");
let (_write, mut read) = ws_stream.split();
let msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for welcome")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = msg {
let json: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(json["type"], "welcome");
assert!(json["message"].as_str().is_some());
} else {
panic!("Expected text message");
}
}
#[tokio::test]
async fn test_websocket_event_broadcast() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream", server.addr);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let _welcome = read.next().await;
client
.create_bucket()
.bucket("test-ws-bucket")
.send()
.await
.expect("Failed to create bucket");
let event_msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for bucket event")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = event_msg {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "bucket-created");
assert_eq!(event["bucket"], "test-ws-bucket");
} else {
panic!("Expected text message");
}
client
.put_object()
.bucket("test-ws-bucket")
.key("test-file.txt")
.body(Bytes::from_static(b"test content").into())
.send()
.await
.expect("Failed to put object");
let event_msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for object event")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = event_msg {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "object-created");
assert_eq!(event["bucket"], "test-ws-bucket");
assert_eq!(event["key"], "test-file.txt");
assert!(event["size"].as_u64().is_some());
assert!(event["etag"].as_str().is_some());
} else {
panic!("Expected text message");
}
}
#[tokio::test]
async fn test_websocket_bucket_filter() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream?bucket=filtered-bucket", server.addr);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let welcome = read.next().await;
if let Some(Ok(Message::Text(text))) = welcome {
let json: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(json["filters"]["bucket"], "filtered-bucket");
}
client
.create_bucket()
.bucket("other-bucket")
.send()
.await
.expect("Failed to create bucket");
client
.create_bucket()
.bucket("filtered-bucket")
.send()
.await
.expect("Failed to create bucket");
let event_msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for event")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = event_msg {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["bucket"], "filtered-bucket");
} else {
panic!("Expected text message");
}
}
#[tokio::test]
async fn test_websocket_prefix_filter() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream?prefix=logs/", server.addr);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let _welcome = read.next().await;
client
.create_bucket()
.bucket("prefix-test")
.send()
.await
.expect("Failed to create bucket");
client
.put_object()
.bucket("prefix-test")
.key("data/file.txt")
.body(Bytes::from_static(b"data").into())
.send()
.await
.expect("Failed to put object");
client
.put_object()
.bucket("prefix-test")
.key("logs/access.log")
.body(Bytes::from_static(b"logs").into())
.send()
.await
.expect("Failed to put object");
let event_msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for event")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = event_msg {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["key"], "logs/access.log");
} else {
panic!("Expected text message");
}
}
#[tokio::test]
async fn test_websocket_event_type_filter() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream?events=object-created", server.addr);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let _welcome = read.next().await;
client
.create_bucket()
.bucket("event-filter-test")
.send()
.await
.expect("Failed to create bucket");
client
.put_object()
.bucket("event-filter-test")
.key("file.txt")
.body(Bytes::from_static(b"content").into())
.send()
.await
.expect("Failed to put object");
let event_msg = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for event")
.expect("No message received")
.expect("Error receiving message");
if let Message::Text(text) = event_msg {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "object-created");
} else {
panic!("Expected text message");
}
}
#[tokio::test]
async fn test_websocket_multiple_event_types() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!(
"ws://{}/events/stream?events=object-created,object-removed",
server.addr
);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let _welcome = read.next().await;
client
.create_bucket()
.bucket("multi-event-test")
.send()
.await
.expect("Failed to create bucket");
client
.put_object()
.bucket("multi-event-test")
.key("file.txt")
.body(Bytes::from_static(b"content").into())
.send()
.await
.expect("Failed to put object");
let event1 = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout")
.expect("No message")
.expect("Error");
if let Message::Text(text) = event1 {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "object-created");
}
client
.delete_object()
.bucket("multi-event-test")
.key("file.txt")
.send()
.await
.expect("Failed to delete object");
let event2 = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout")
.expect("No message")
.expect("Error");
if let Message::Text(text) = event2 {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "object-removed");
}
}
#[tokio::test]
async fn test_websocket_ping_pong() {
let (_client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!("ws://{}/events/stream", server.addr);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (mut write, mut read) = ws_stream.split();
let _welcome = read.next().await;
write
.send(Message::Ping(Bytes::from_static(&[1, 2, 3])))
.await
.expect("Failed to send ping");
let response = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout waiting for pong");
assert!(response.is_some());
}
#[tokio::test]
async fn test_websocket_multipart_events() {
let (client, _temp_dir, server) = setup_test_server().await;
let ws_url = format!(
"ws://{}/events/stream?events=multipart-upload-created,multipart-upload-completed",
server.addr
);
let (ws_stream, _response) = connect_async(&ws_url).await.expect("Failed to connect");
let (_write, mut read) = ws_stream.split();
let _welcome = read.next().await;
client
.create_bucket()
.bucket("multipart-test")
.send()
.await
.expect("Failed to create bucket");
let upload = client
.create_multipart_upload()
.bucket("multipart-test")
.key("large-file.dat")
.send()
.await
.expect("Failed to create multipart upload");
let event1 = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout")
.expect("No message")
.expect("Error");
if let Message::Text(text) = event1 {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "multipart-upload-created");
assert_eq!(event["bucket"], "multipart-test");
assert_eq!(event["key"], "large-file.dat");
}
let upload_id = upload.upload_id().expect("No upload ID");
let part = client
.upload_part()
.bucket("multipart-test")
.key("large-file.dat")
.upload_id(upload_id)
.part_number(1)
.body(Bytes::from_static(b"part data").into())
.send()
.await
.expect("Failed to upload part");
client
.complete_multipart_upload()
.bucket("multipart-test")
.key("large-file.dat")
.upload_id(upload_id)
.multipart_upload(
aws_sdk_s3::types::CompletedMultipartUpload::builder()
.parts(
aws_sdk_s3::types::CompletedPart::builder()
.part_number(1)
.e_tag(part.e_tag().expect("No ETag"))
.build(),
)
.build(),
)
.send()
.await
.expect("Failed to complete multipart upload");
let event2 = timeout(Duration::from_secs(2), read.next())
.await
.expect("Timeout")
.expect("No message")
.expect("Error");
if let Message::Text(text) = event2 {
let event: Value = serde_json::from_str(&text).expect("Invalid JSON");
assert_eq!(event["eventType"], "multipart-upload-completed");
}
}