use std::collections::VecDeque;
use std::sync::Arc;
use async_trait::async_trait;
#[cfg(feature = "bebop")]
use bebop::Record;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_tungstenite::tungstenite::Message;
#[cfg(feature = "bebop")]
use crate::generated::schema::{Data, ServerConnectInfo};
#[cfg(feature = "bebop")]
use crate::helpers::common::get_setting_by_key;
#[cfg(not(feature = "bebop"))]
use crate::helpers::common::remove_setting;
use crate::{
helpers::{
common::set_setting, get_internal_websocket::wrap_get_internal_websocket,
get_outer_websocket::wrap_get_outer_websocket, metrics::Metrics,
},
log_debug, log_error, AtomicWebsocketType, Settings,
};
use crate::helpers::traits::date_time::now;
use super::{
common::make_disconnect_message,
internal_client::ClientOptions,
retry::ExponentialBackoff,
types::{save_key, RwServerSender, DB},
};
async fn persist_connection_info(db: DB, server_ip: &str) {
#[cfg(feature = "bebop")]
let value = {
let port = server_ip.split(':').nth(1).unwrap_or("");
let data = ServerConnectInfo { server_ip, port };
let mut buf = Vec::new();
if let Err(e) = data.serialize(&mut buf) {
log_error!("Failed to serialize ServerConnectInfo: {:?}", e);
return;
}
buf
};
#[cfg(not(feature = "bebop"))]
let value = server_ip.as_bytes().to_vec();
if let Err(e) = set_setting(
db,
Settings {
key: save_key::SERVER_CONNECT_INFO.to_owned(),
value,
},
)
.await
{
log_error!("Failed to persist connection info: {:?}", e);
}
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum SenderStatus {
Start,
Connecting,
Connected,
Disconnected,
Reconnecting,
}
pub struct ServerSender {
sx: Option<mpsc::Sender<Message>>,
pub db: DB,
pub server_sender: Option<RwServerSender>,
pub server_ip: String,
pub server_received_times: i64,
status_tx: Sender<SenderStatus>,
status_rx: Option<Receiver<SenderStatus>>,
handle_message_tx: Sender<Vec<u8>>,
handle_message_rx: Option<Receiver<Vec<u8>>>,
pub options: ClientOptions,
pub is_try_connect: bool,
pub metrics: Arc<Metrics>,
spillover: std::sync::Mutex<VecDeque<Vec<u8>>>,
}
impl ServerSender {
pub fn new(db: DB, server_ip: String, options: ClientOptions) -> Self {
let (status_tx, status_rx) = mpsc::channel(options.status_buffer_size);
let (handle_message_tx, handle_message_rx) = mpsc::channel(options.handler_buffer_size);
Self {
sx: None,
db,
server_sender: None,
server_ip,
server_received_times: 0,
status_tx,
status_rx: Some(status_rx),
handle_message_tx,
handle_message_rx: Some(handle_message_rx),
options,
is_try_connect: false,
metrics: Arc::new(Metrics::new()),
spillover: std::sync::Mutex::new(VecDeque::new()),
}
}
pub fn get_status_receiver(&mut self) -> Option<Receiver<SenderStatus>> {
self.status_rx.take()
}
pub fn get_handle_message_receiver(&mut self) -> Option<Receiver<Vec<u8>>> {
self.handle_message_rx.take()
}
pub fn regist(&mut self, server_sender: RwServerSender) {
self.server_sender = Some(server_sender);
}
fn sx_drop(&mut self) {
if let Some(sender) = self.sx.take() {
let prev_server_ip = self.server_ip.clone();
tokio::spawn(async move {
let _ = sender.send(make_disconnect_message(&prev_server_ip)).await;
sender.closed().await;
});
}
}
pub fn add(&mut self, sx: mpsc::Sender<Message>, server_ip: &str) {
self.sx_drop();
self.sx = Some(sx);
self.server_ip = server_ip.into();
}
pub fn remove_ip(&mut self) {
if !self.server_ip.is_empty() {
self.sx_drop();
self.server_ip = "".into();
}
}
pub fn send_status(&self, status: SenderStatus) -> bool {
if self.status_tx.try_send(status).is_err() {
log_debug!("Status channel full, dropping status update");
self.metrics.inc_messages_dropped();
return false;
}
true
}
pub fn send_handle_message(&self, data: Vec<u8>) {
let mut spillover = self.spillover.lock().unwrap_or_else(|e| e.into_inner());
while let Some(data) = spillover.front().cloned() {
match self.handle_message_tx.try_send(data) {
Ok(()) => {
spillover.pop_front();
}
Err(_) => break,
}
}
if spillover.is_empty() {
match self.handle_message_tx.try_send(data) {
Ok(()) => (),
Err(tokio::sync::mpsc::error::TrySendError::Full(data)) => {
if spillover.len() < self.options.spillover_buffer_size {
spillover.push_back(data);
} else {
self.metrics.inc_messages_dropped();
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
log_error!("Handle message channel closed");
}
}
} else {
if spillover.len() < self.options.spillover_buffer_size {
spillover.push_back(data);
} else {
self.metrics.inc_messages_dropped();
}
}
}
#[cfg(test)]
fn drain_spillover(&self) {
let mut spillover = self.spillover.lock().unwrap_or_else(|e| e.into_inner());
while let Some(data) = spillover.front().cloned() {
match self.handle_message_tx.try_send(data) {
Ok(()) => {
spillover.pop_front();
}
Err(_) => break,
}
}
}
}
#[async_trait]
pub trait ServerSenderTrait {
async fn add(&self, sx: mpsc::Sender<Message>, server_ip: &str);
async fn send_status(&self, status: SenderStatus);
#[cfg(feature = "bebop")]
async fn send_handle_message(&self, data: Data<'_>);
#[cfg(not(feature = "bebop"))]
async fn send_handle_message(&self, data: Vec<u8>);
async fn get_status_receiver(&self) -> Option<Receiver<SenderStatus>>;
async fn get_handle_message_receiver(&self) -> Option<Receiver<Vec<u8>>>;
async fn send(&self, message: Message);
async fn regist(&mut self, server_sender: RwServerSender);
async fn is_valid_server_ip(&self) -> bool;
async fn remove_ip(&self);
async fn remove_ip_if_valid_server_ip(&self, server_ip: &str);
async fn write_received_times(&self);
async fn is_need_connect(&self) -> bool;
}
#[async_trait]
impl ServerSenderTrait for RwServerSender {
async fn add(&self, sx: mpsc::Sender<Message>, server_ip: &str) {
let db = {
let mut guard = self.write().await;
guard.add(sx, server_ip);
guard.db.clone()
};
log_debug!("set start server_ip: {:?}", server_ip);
persist_connection_info(db, server_ip).await;
}
async fn get_status_receiver(&self) -> Option<Receiver<SenderStatus>> {
self.write().await.get_status_receiver()
}
async fn get_handle_message_receiver(&self) -> Option<Receiver<Vec<u8>>> {
self.write().await.get_handle_message_receiver()
}
async fn send_status(&self, status: SenderStatus) {
self.read().await.send_status(status);
}
#[cfg(feature = "bebop")]
async fn send_handle_message(&self, data: Data<'_>) {
let mut buf = Vec::with_capacity(256);
if let Err(e) = data.serialize(&mut buf) {
log_error!("Failed to serialize Data: {:?}", e);
return;
}
let guard = self.read().await;
guard.metrics.inc_messages_received();
guard.send_handle_message(buf);
}
#[cfg(not(feature = "bebop"))]
async fn send_handle_message(&self, data: Vec<u8>) {
let guard = self.read().await;
guard.metrics.inc_messages_received();
guard.send_handle_message(data);
}
async fn send(&self, message: Message) {
let (sender, status_tx, options, server_sender_ref, db, server_ip, metrics) = {
let guard = self.read().await;
let Some(sx) = guard.sx.as_ref() else {
return;
};
(
sx.clone(),
guard.status_tx.clone(),
guard.options.clone(),
guard.server_sender.clone(),
guard.db.clone(),
guard.server_ip.clone(),
guard.metrics.clone(),
)
};
let limit_count = match options.retry_seconds > 5 {
true => 5,
false => match options.retry_seconds {
0 | 1 => 1,
_ => (options.retry_seconds - 1) as u32,
},
};
let mut backoff = ExponentialBackoff::new(50, 1, limit_count);
loop {
match sender.send(message.clone()).await {
Ok(_) => {
metrics.inc_messages_sent();
return;
}
Err(e) => {
log_error!(
"Send error (channel closed, attempt {}): {:?}",
backoff.count() + 1,
e
);
if !backoff.wait().await {
metrics.inc_send_errors();
let _ = status_tx.try_send(SenderStatus::Disconnected);
if let Some(ref ss) = server_sender_ref {
match options.atomic_websocket_type {
AtomicWebsocketType::Internal => {
tokio::spawn(wrap_get_internal_websocket(
db,
ss.clone(),
server_ip,
options,
));
}
AtomicWebsocketType::External => {
tokio::spawn(wrap_get_outer_websocket(db, ss.clone(), options));
}
}
}
return;
}
}
}
}
}
async fn regist(&mut self, server_sender: RwServerSender) {
self.write().await.regist(server_sender);
}
async fn is_valid_server_ip(&self) -> bool {
let clone = self.read().await;
let result = !clone.server_ip.is_empty()
&& clone.server_received_times
+ (match clone.options.retry_seconds {
0 => 1,
_ => clone.options.retry_seconds as i64,
} * 2)
> now().timestamp();
drop(clone);
result
}
async fn remove_ip(&self) {
self.write().await.remove_ip();
}
async fn remove_ip_if_valid_server_ip(&self, server_ip: &str) {
let db = self.read().await.db.clone();
self.remove_ip().await;
#[cfg(feature = "bebop")]
{
let server_connect_info = match get_setting_by_key(
db.clone(),
save_key::SERVER_CONNECT_INFO.to_owned(),
)
.await
{
Ok(info) => info,
Err(error) => {
log_error!("Failed to get server_connect_info {error:?}");
return;
}
};
let Some(server_connect_info) = server_connect_info else {
return;
};
let Ok(mut info) = ServerConnectInfo::deserialize(&server_connect_info.value) else {
log_error!("Failed to deserialize ServerConnectInfo");
return;
};
let stored_ip_normalized = info
.server_ip
.trim_start_matches("ws://")
.trim_start_matches("wss://")
.split(':')
.next()
.unwrap_or(info.server_ip);
let server_ip_normalized = server_ip
.trim_start_matches("ws://")
.trim_start_matches("wss://")
.split(':')
.next()
.unwrap_or(server_ip);
if !stored_ip_normalized.is_empty()
&& !server_ip_normalized.is_empty()
&& stored_ip_normalized != server_ip_normalized
{
log_debug!(
"IP mismatch: stored={}, attempted={}. Force resetting.",
stored_ip_normalized,
server_ip_normalized
);
}
info.server_ip = "";
let mut value = Vec::new();
if let Err(e) = info.serialize(&mut value) {
log_error!("Failed to serialize ServerConnectInfo: {:?}", e);
return;
}
if let Err(e) = set_setting(
db,
Settings {
key: save_key::SERVER_CONNECT_INFO.to_owned(),
value,
},
)
.await
{
log_error!("Failed to update connection info: {:?}", e);
}
}
#[cfg(not(feature = "bebop"))]
{
let _ = server_ip;
if let Err(e) = remove_setting(db, save_key::SERVER_CONNECT_INFO.to_owned()).await {
log_error!("Failed to remove connection info: {:?}", e);
}
}
}
async fn write_received_times(&self) {
self.write().await.server_received_times = now().timestamp();
}
async fn is_need_connect(&self) -> bool {
!self.read().await.is_try_connect
}
}
#[cfg(feature = "bebop")]
#[test]
fn get_sercer_connect_info() {
let binary: Vec<u8> = vec![0, 0, 0, 0, 5, 0, 0, 0, 49, 54, 50, 53, 48];
let data = ServerConnectInfo::deserialize(&binary).unwrap();
println!("{:?}", data);
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(feature = "native-db"))]
use std::sync::Arc;
#[cfg(not(feature = "native-db"))]
use tokio::sync::{Mutex, RwLock};
#[cfg(not(feature = "native-db"))]
use crate::helpers::types::InMemoryStorage;
#[cfg(not(feature = "native-db"))]
fn create_test_db() -> DB {
Arc::new(Mutex::new(InMemoryStorage::new()))
}
#[cfg(not(feature = "native-db"))]
fn create_test_options() -> ClientOptions {
ClientOptions::default()
}
#[cfg(feature = "native-db")]
use std::sync::Arc;
#[cfg(feature = "native-db")]
use tokio::sync::Mutex;
#[cfg(feature = "native-db")]
fn create_test_db_native() -> DB {
use native_db::{Builder, Models};
let mut models = Models::new();
models.define::<Settings>().unwrap();
let models: &'static Models = Box::leak(Box::new(models));
let temp = tempfile::NamedTempFile::new().unwrap();
Arc::new(Mutex::new(
Builder::new().create(models, temp.path()).unwrap(),
))
}
#[cfg(feature = "native-db")]
fn create_test_options_native() -> ClientOptions {
ClientOptions::default()
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_new() {
let db = create_test_db();
let sender = ServerSender::new(db, "127.0.0.1:9000".to_string(), create_test_options());
assert!(sender.sx.is_none());
assert_eq!(sender.server_ip, "127.0.0.1:9000");
assert_eq!(sender.server_received_times, 0);
assert!(!sender.is_try_connect);
assert!(sender.server_sender.is_none());
assert!(sender.status_rx.is_some());
assert!(sender.handle_message_rx.is_some());
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_get_status_receiver() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let rx = sender.get_status_receiver();
assert!(rx.is_some());
assert!(sender.status_rx.is_none());
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_get_status_receiver_double_call_returns_none() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let _rx1 = sender.get_status_receiver();
let rx2 = sender.get_status_receiver();
assert!(rx2.is_none(), "Second call should return None");
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_get_handle_message_receiver() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let rx = sender.get_handle_message_receiver();
assert!(rx.is_some());
assert!(sender.handle_message_rx.is_none());
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_get_handle_message_receiver_double_call_returns_none() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let _rx1 = sender.get_handle_message_receiver();
let rx2 = sender.get_handle_message_receiver();
assert!(rx2.is_none(), "Second call should return None");
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_regist() {
let db = create_test_db();
let mut sender = ServerSender::new(db.clone(), "".to_string(), create_test_options());
assert!(sender.server_sender.is_none());
let rw_sender = Arc::new(RwLock::new(ServerSender::new(
db,
"".to_string(),
create_test_options(),
)));
sender.regist(rw_sender);
assert!(sender.server_sender.is_some());
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_add_updates_state() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
assert!(sender.sx.is_none());
assert_eq!(sender.server_ip, "");
let (tx, _rx) = mpsc::channel(8);
sender.add(tx, "192.168.1.100:9000");
assert!(sender.sx.is_some());
assert_eq!(sender.server_ip, "192.168.1.100:9000");
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_server_sender_remove_ip_clears_state() {
let db = create_test_db();
let mut sender =
ServerSender::new(db, "192.168.1.100:9000".to_string(), create_test_options());
let (tx, _rx) = mpsc::channel(8);
sender.add(tx, "192.168.1.100:9000");
assert!(!sender.server_ip.is_empty());
sender.remove_ip();
assert!(sender.server_ip.is_empty());
assert!(sender.sx.is_none());
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_server_sender_remove_ip_empty_no_panic() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
sender.remove_ip();
assert!(sender.server_ip.is_empty());
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_server_sender_send_status() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let mut rx = sender.get_status_receiver().expect("receiver");
sender.send_status(SenderStatus::Connected);
let status = rx.recv().await;
assert!(status.is_some());
assert_eq!(status.unwrap(), SenderStatus::Connected);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_server_sender_send_handle_message() {
let db = create_test_db();
let mut sender = ServerSender::new(db, "".to_string(), create_test_options());
let mut rx = sender.get_handle_message_receiver().expect("receiver");
sender.send_handle_message(vec![1, 2, 3, 4, 5]);
let data = rx.recv().await;
assert!(data.is_some());
assert_eq!(data.unwrap(), vec![1, 2, 3, 4, 5]);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_spillover_stores_when_channel_full() {
let db = create_test_db();
let mut options = create_test_options();
options.handler_buffer_size = 2; options.spillover_buffer_size = 10;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
sender.send_handle_message(vec![1]);
sender.send_handle_message(vec![2]);
sender.send_handle_message(vec![3]);
sender.send_handle_message(vec![4]);
sender.send_handle_message(vec![5]);
let spillover_len = sender.spillover.lock().unwrap().len();
assert_eq!(spillover_len, 3, "spillover에 3개 메시지가 저장되어야 함");
let msg = rx.recv().await.unwrap();
assert_eq!(msg, vec![1]);
sender.send_handle_message(vec![6]);
let spillover_len = sender.spillover.lock().unwrap().len();
assert!(
spillover_len <= 3,
"drain 후 spillover는 3 이하여야 함, actual: {}",
spillover_len
);
let msg = rx.recv().await.unwrap();
assert_eq!(msg, vec![2]);
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_spillover_drops_when_cap_exceeded() {
let db = create_test_db();
let mut options = create_test_options();
options.handler_buffer_size = 1;
options.spillover_buffer_size = 3; let mut sender = ServerSender::new(db, "".to_string(), options);
let _rx = sender.get_handle_message_receiver().expect("receiver");
sender.send_handle_message(vec![1]);
sender.send_handle_message(vec![2]);
sender.send_handle_message(vec![3]);
sender.send_handle_message(vec![4]);
assert_eq!(sender.spillover.lock().unwrap().len(), 3);
sender.send_handle_message(vec![5]);
sender.send_handle_message(vec![6]);
assert_eq!(sender.spillover.lock().unwrap().len(), 3);
let snapshot = sender.metrics.snapshot();
assert_eq!(
snapshot.messages_dropped, 2,
"cap 초과로 2개 메시지가 drop되어야 함"
);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_spillover_drain_preserves_order() {
let db = create_test_db();
let mut options = create_test_options();
options.handler_buffer_size = 2;
options.spillover_buffer_size = 100;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
for i in 1..=6u8 {
sender.send_handle_message(vec![i]);
}
assert_eq!(sender.spillover.lock().unwrap().len(), 4);
let mut received = Vec::new();
received.push(rx.recv().await.unwrap());
received.push(rx.recv().await.unwrap());
sender.send_handle_message(vec![7]);
while let Ok(msg) = rx.try_recv() {
received.push(msg);
}
for i in 0..received.len().saturating_sub(1) {
assert!(
received[i] <= received[i + 1],
"순서 위반: {:?} 다음에 {:?}",
received[i],
received[i + 1]
);
}
}
#[cfg(not(feature = "native-db"))]
#[test]
fn test_spillover_drain_empty_is_noop() {
let db = create_test_db();
let sender = ServerSender::new(db, "".to_string(), create_test_options());
sender.drain_spillover();
assert_eq!(sender.spillover.lock().unwrap().len(), 0);
}
#[cfg(feature = "native-db")]
#[tokio::test]
async fn test_spillover_stores_when_channel_full_native_db() {
let db = create_test_db_native();
let mut options = create_test_options_native();
options.handler_buffer_size = 2;
options.spillover_buffer_size = 10;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
sender.send_handle_message(vec![1]);
sender.send_handle_message(vec![2]);
sender.send_handle_message(vec![3]);
sender.send_handle_message(vec![4]);
sender.send_handle_message(vec![5]);
let spillover_len = sender.spillover.lock().unwrap().len();
assert_eq!(spillover_len, 3, "spillover에 3개 메시지가 저장되어야 함");
let msg = rx.recv().await.unwrap();
assert_eq!(msg, vec![1]);
sender.send_handle_message(vec![6]);
let spillover_len = sender.spillover.lock().unwrap().len();
assert!(
spillover_len <= 3,
"drain 후 spillover는 3 이하여야 함, actual: {}",
spillover_len
);
let msg = rx.recv().await.unwrap();
assert_eq!(msg, vec![2]);
}
#[cfg(feature = "native-db")]
#[test]
fn test_spillover_drops_when_cap_exceeded_native_db() {
let db = create_test_db_native();
let mut options = create_test_options_native();
options.handler_buffer_size = 1;
options.spillover_buffer_size = 3;
let mut sender = ServerSender::new(db, "".to_string(), options);
let _rx = sender.get_handle_message_receiver().expect("receiver");
sender.send_handle_message(vec![1]);
sender.send_handle_message(vec![2]);
sender.send_handle_message(vec![3]);
sender.send_handle_message(vec![4]);
assert_eq!(sender.spillover.lock().unwrap().len(), 3);
sender.send_handle_message(vec![5]);
sender.send_handle_message(vec![6]);
assert_eq!(sender.spillover.lock().unwrap().len(), 3);
let snapshot = sender.metrics.snapshot();
assert_eq!(
snapshot.messages_dropped, 2,
"cap 초과로 2개 메시지가 drop되어야 함"
);
}
#[cfg(feature = "native-db")]
#[tokio::test]
async fn test_spillover_drain_preserves_order_native_db() {
let db = create_test_db_native();
let mut options = create_test_options_native();
options.handler_buffer_size = 2;
options.spillover_buffer_size = 100;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
for i in 1..=6u8 {
sender.send_handle_message(vec![i]);
}
assert_eq!(sender.spillover.lock().unwrap().len(), 4);
let mut received = Vec::new();
received.push(rx.recv().await.unwrap());
received.push(rx.recv().await.unwrap());
sender.send_handle_message(vec![7]);
while let Ok(msg) = rx.try_recv() {
received.push(msg);
}
for i in 0..received.len().saturating_sub(1) {
assert!(
received[i] <= received[i + 1],
"순서 위반: {:?} 다음에 {:?}",
received[i],
received[i + 1]
);
}
}
#[cfg(feature = "native-db")]
#[test]
fn test_spillover_drain_empty_is_noop_native_db() {
let db = create_test_db_native();
let sender = ServerSender::new(db, "".to_string(), create_test_options_native());
sender.drain_spillover();
assert_eq!(sender.spillover.lock().unwrap().len(), 0);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_spillover_full_recovery_all_messages_received() {
let db = create_test_db();
let mut options = create_test_options();
options.handler_buffer_size = 3;
options.spillover_buffer_size = 100;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
let total_messages = 20u8;
for i in 1..=total_messages {
sender.send_handle_message(vec![i]);
}
assert_eq!(sender.spillover.lock().unwrap().len(), 17);
assert_eq!(
sender.metrics.snapshot().messages_dropped,
0,
"drop 없어야 함"
);
let mut received = Vec::new();
while let Ok(msg) = rx.try_recv() {
received.push(msg[0]);
}
loop {
sender.drain_spillover();
let mut drained_any = false;
while let Ok(msg) = rx.try_recv() {
received.push(msg[0]);
drained_any = true;
}
let remaining = sender.spillover.lock().unwrap().len();
if remaining == 0 && !drained_any {
break;
}
}
assert_eq!(
received.len(),
total_messages as usize,
"전체 {}개 메시지 모두 수신되어야 함, actual: {}",
total_messages,
received.len()
);
let expected: Vec<u8> = (1..=total_messages).collect();
assert_eq!(
received, expected,
"메시지 순서가 1..={} 이어야 함",
total_messages
);
assert_eq!(
sender.spillover.lock().unwrap().len(),
0,
"spillover 완전 drain"
);
assert_eq!(sender.metrics.snapshot().messages_dropped, 0, "drop 없음");
}
#[cfg(feature = "native-db")]
#[tokio::test]
async fn test_spillover_full_recovery_all_messages_received_native_db() {
let db = create_test_db_native();
let mut options = create_test_options_native();
options.handler_buffer_size = 3;
options.spillover_buffer_size = 100;
let mut sender = ServerSender::new(db, "".to_string(), options);
let mut rx = sender.get_handle_message_receiver().expect("receiver");
let total_messages = 20u8;
for i in 1..=total_messages {
sender.send_handle_message(vec![i]);
}
assert_eq!(sender.spillover.lock().unwrap().len(), 17);
assert_eq!(sender.metrics.snapshot().messages_dropped, 0);
let mut received = Vec::new();
while let Ok(msg) = rx.try_recv() {
received.push(msg[0]);
}
loop {
sender.drain_spillover();
let mut drained_any = false;
while let Ok(msg) = rx.try_recv() {
received.push(msg[0]);
drained_any = true;
}
let remaining = sender.spillover.lock().unwrap().len();
if remaining == 0 && !drained_any {
break;
}
}
assert_eq!(received.len(), total_messages as usize);
let expected: Vec<u8> = (1..=total_messages).collect();
assert_eq!(received, expected);
assert_eq!(sender.spillover.lock().unwrap().len(), 0);
assert_eq!(sender.metrics.snapshot().messages_dropped, 0);
}
#[test]
fn test_sender_status_equality() {
assert_eq!(SenderStatus::Start, SenderStatus::Start);
assert_eq!(SenderStatus::Connected, SenderStatus::Connected);
assert_eq!(SenderStatus::Disconnected, SenderStatus::Disconnected);
assert_ne!(SenderStatus::Start, SenderStatus::Connected);
assert_ne!(SenderStatus::Connected, SenderStatus::Disconnected);
}
#[test]
fn test_sender_status_clone() {
let status = SenderStatus::Connected;
let cloned = status.clone();
assert_eq!(status, cloned);
}
#[test]
fn test_sender_status_debug() {
let status = SenderStatus::Disconnected;
let debug_str = format!("{:?}", status);
assert!(debug_str.contains("Disconnected"));
}
#[cfg(not(feature = "native-db"))]
fn create_rw_server_sender() -> RwServerSender {
let db = create_test_db();
Arc::new(RwLock::new(ServerSender::new(
db,
"".to_string(),
create_test_options(),
)))
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_is_valid_server_ip_empty() {
let sender = create_rw_server_sender();
assert!(!sender.is_valid_server_ip().await);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_is_valid_server_ip_with_recent_activity() {
let sender = create_rw_server_sender();
{
let mut guard = sender.write().await;
guard.server_ip = "192.168.1.100:9000".to_string();
}
sender.write_received_times().await;
assert!(sender.is_valid_server_ip().await);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_is_valid_server_ip_with_old_activity() {
let sender = create_rw_server_sender();
{
let mut guard = sender.write().await;
guard.server_ip = "192.168.1.100:9000".to_string();
guard.server_received_times = 0;
}
assert!(!sender.is_valid_server_ip().await);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_is_need_connect_not_trying() {
let sender = create_rw_server_sender();
{
let mut guard = sender.write().await;
guard.server_ip = "192.168.1.100:9000".to_string();
guard.is_try_connect = false;
}
assert!(sender.is_need_connect().await);
assert!(sender.is_need_connect().await);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_is_need_connect_already_trying() {
let sender = create_rw_server_sender();
{
let mut guard = sender.write().await;
guard.server_ip = "192.168.1.100:9000".to_string();
guard.is_try_connect = true; }
assert!(!sender.is_need_connect().await);
assert!(!sender.is_need_connect().await);
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_write_received_times_updates_timestamp() {
let sender = create_rw_server_sender();
{
let guard = sender.read().await;
assert_eq!(guard.server_received_times, 0);
}
sender.write_received_times().await;
{
let guard = sender.read().await;
assert!(guard.server_received_times > 0);
let now_ts = now().timestamp();
assert!((guard.server_received_times - now_ts).abs() <= 1);
}
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_remove_ip() {
let sender = create_rw_server_sender();
{
let mut guard = sender.write().await;
guard.server_ip = "192.168.1.100:9000".to_string();
}
sender.remove_ip().await;
{
let guard = sender.read().await;
assert!(guard.server_ip.is_empty());
}
}
#[cfg(not(feature = "native-db"))]
#[tokio::test]
async fn test_trait_send_status_through_rwlock() {
let sender = create_rw_server_sender();
let mut rx = sender.get_status_receiver().await.expect("receiver");
sender.send_status(SenderStatus::Start).await;
let status = rx.recv().await;
assert!(status.is_some());
assert_eq!(status.unwrap(), SenderStatus::Start);
}
}