use crate::core::error::Result;
#[allow(unused_imports)]
use crate::core::error::XLinkError;
use crate::core::traits::Channel;
use crate::core::types::{ChannelState, ChannelType, DeviceId, Message, NetworkType};
use async_trait::async_trait;
#[cfg(not(feature = "test_no_external_deps"))]
#[cfg(all(not(feature = "test_no_external_deps"), not(test)))]
use reqwest;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct RemoteChannel {
local_device_id: DeviceId,
primary_server_url: String,
backup_server_urls: Vec<String>,
current_server_index: Arc<Mutex<usize>>,
peer_topics: Arc<Mutex<HashMap<DeviceId, String>>>,
test_mode: bool,
}
impl RemoteChannel {
pub fn new(local_device_id: DeviceId, server_url: Option<String>) -> Self {
let primary_url = server_url.unwrap_or_else(|| "https://ntfy.sh".to_string());
let backup_urls = vec![
"https://ntfy.sh".to_string(),
"https://ntfy.net".to_string(),
"https://ntfy.dev".to_string(),
];
Self {
local_device_id,
primary_server_url: primary_url,
backup_server_urls: backup_urls,
current_server_index: Arc::new(Mutex::new(0)),
peer_topics: Arc::new(Mutex::new(HashMap::new())),
test_mode: false,
}
}
pub fn with_failover(
local_device_id: DeviceId,
primary_url: String,
backup_urls: Vec<String>,
) -> Self {
Self {
local_device_id,
primary_server_url: primary_url,
backup_server_urls: backup_urls,
current_server_index: Arc::new(Mutex::new(0)),
peer_topics: Arc::new(Mutex::new(HashMap::new())),
test_mode: false,
}
}
pub fn set_test_mode(&mut self, enabled: bool) {
self.test_mode = enabled;
}
fn is_test_mode(&self) -> bool {
self.test_mode || cfg!(any(feature = "test_no_external_deps", test))
}
pub async fn current_server_url(&self) -> String {
let index = *self.current_server_index.lock().await;
if index == 0 {
self.primary_server_url.clone()
} else {
self.backup_server_urls
.get(index - 1)
.unwrap_or(&self.primary_server_url)
.clone()
}
}
pub async fn switch_to_next_server(&self) -> bool {
let mut index = self.current_server_index.lock().await;
if *index < self.backup_server_urls.len() {
*index += 1;
log::warn!(
"[Remote] Switched to backup server {}: {}",
*index,
self.backup_server_urls
.get(*index - 1)
.unwrap_or(&self.primary_server_url)
);
true
} else {
log::error!("[Remote] No more backup servers available, staying with current server");
false
}
}
pub async fn register_peer_topic(&self, device_id: DeviceId, topic: String) {
let mut topics = self.peer_topics.lock().await;
topics.insert(device_id, topic);
}
}
#[async_trait]
impl Channel for RemoteChannel {
fn channel_type(&self) -> ChannelType {
ChannelType::Internet
}
async fn send(&self, message: Message) -> Result<()> {
let topic = {
let topics = self.peer_topics.lock().await;
topics.get(&message.recipient).cloned()
};
if self.is_test_mode() {
let topic_str = topic.unwrap_or_else(|| message.recipient.to_string());
log::info!(
"[Remote] Mock sending message {} to ntfy topic {} (test mode)",
message.id,
topic_str
);
self.register_peer_topic(message.recipient, topic_str).await;
return Ok(());
}
#[cfg(all(not(feature = "test_no_external_deps"), not(test)))]
{
let mut attempts = 0;
let max_attempts = self.backup_server_urls.len() + 1;
loop {
let current_server = self.current_server_url().await;
let topic_str = topic
.clone()
.unwrap_or_else(|| message.recipient.to_string());
let url = format!("{}/{}", current_server, topic_str);
let payload = serde_json::to_vec(&message).map_err(Into::<XLinkError>::into)?;
log::info!(
"[Remote] Attempting to publish message {} to ntfy topic {} on server {}",
message.id,
topic_str,
current_server
);
let client = reqwest::Client::new();
match client
.post(&url)
.header("Content-Type", "application/json")
.body(payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
log::info!(
"[Remote] Successfully published message {} to ntfy on server {}",
message.id,
current_server
);
if topic.is_none() {
self.register_peer_topic(message.recipient, topic_str).await;
}
return Ok(());
} else {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
log::warn!(
"[Remote] Server {} returned error for message {}: {} - {}",
current_server,
message.id,
status,
error_text
);
if attempts < max_attempts - 1 && self.switch_to_next_server().await {
attempts += 1;
continue;
}
return Err(XLinkError::channel_disconnected(
format!("ntfy request failed: {} - {}", status, error_text),
file!(),
));
}
}
Err(e) => {
log::warn!(
"[Remote] Network error sending message {} to server {}: {}",
message.id,
current_server,
e
);
if attempts < max_attempts - 1 && self.switch_to_next_server().await {
attempts += 1;
continue;
}
return Err(XLinkError::channel_disconnected(
format!(
"Failed to send to ntfy after {} attempts: {}",
attempts + 1,
e
),
file!(),
));
}
}
}
}
#[cfg(any(feature = "test_no_external_deps", test))]
{
Ok(())
}
}
async fn check_state(&self, _target: &DeviceId) -> Result<ChannelState> {
Ok(ChannelState {
available: true,
rtt_ms: 200,
jitter_ms: 50,
packet_loss_rate: 0.01,
bandwidth_bps: 10_000_000, signal_strength: None,
distance_meters: None, network_type: NetworkType::Cellular5G, failure_count: 0,
last_heartbeat: 0,
})
}
async fn start(&self) -> Result<()> {
log::info!(
"Remote ntfy channel started for device {}",
self.local_device_id
);
Ok(())
}
}