pub mod capability;
pub mod channels;
pub mod core;
pub mod crypto;
pub mod router;
pub mod storage;
pub mod discovery;
pub mod ffi;
pub mod group;
pub mod heartbeat;
pub mod media;
pub mod utils;
use crate::capability::manager::CapabilityManager;
use crate::core::error::Result;
use crate::core::traits::{Channel, MessageHandler, Storage};
use crate::core::types::{ChannelType, DeviceCapabilities, DeviceId, Message, MessagePayload};
use crate::crypto::engine::CryptoEngine;
use crate::router::selector::Router;
#[cfg(not(feature = "test_no_external_deps"))]
use crate::discovery::manager::DiscoveryManager;
#[cfg(feature = "test_no_external_deps")]
use crate::discovery::manager_test::DiscoveryManager;
use crate::group::manager::GroupManager;
use crate::heartbeat::manager::HeartbeatManager;
use crate::media::stream_manager::StreamManager;
use x25519_dalek::PublicKey;
use async_trait::async_trait;
use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
pub struct XLink {
device_id: DeviceId,
router: Arc<Router>,
cap_manager: Arc<CapabilityManager>,
crypto: Arc<CryptoEngine>,
storage: Arc<dyn Storage>,
group_manager: Arc<GroupManager>,
heartbeat_manager: Arc<Mutex<HeartbeatManager>>,
discovery_manager: Arc<Mutex<DiscoveryManager>>,
stream_manager: Arc<StreamManager>,
cap_detector: Arc<Mutex<crate::capability::detector::LocalCapabilityDetector>>,
rate_limiter: Arc<DashMap<DeviceId, (Instant, u32)>>,
metrics: Arc<crate::core::metrics::MetricsCollector>,
receive_tasks: Arc<DashMap<ChannelType, JoinHandle<()>>>,
background_tasks: Arc<DashMap<String, JoinHandle<()>>>,
app_rx: Arc<Mutex<mpsc::Receiver<Message>>>,
app_tx: mpsc::Sender<Message>,
compliance: Arc<crate::core::types::ComplianceConfig>,
plugins: Arc<DashMap<String, Arc<dyn crate::core::traits::Plugin>>>,
}
impl Drop for XLink {
fn drop(&mut self) {
log::info!("Dropping UnifiedPush SDK for device {}", self.device_id);
let receive_task_keys: Vec<_> = self
.receive_tasks
.iter()
.map(|entry| *entry.key())
.collect();
for channel_type in receive_task_keys {
if let Some((_, task)) = self.receive_tasks.remove(&channel_type) {
task.abort();
}
}
let background_task_keys: Vec<_> = self
.background_tasks
.iter()
.map(|entry| entry.key().clone())
.collect();
for task_name in background_task_keys {
if let Some((_, task)) = self.background_tasks.remove(&task_name) {
task.abort();
}
}
self.crypto.clear_sessions();
self.cap_manager.clear_remote_devices();
self.group_manager.clear_groups();
self.stream_manager.clear_streams();
self.storage.clear_indexes();
self.router.clear_channels_sync();
self.metrics.clear();
let rate_limiter_keys: Vec<_> =
self.rate_limiter.iter().map(|entry| *entry.key()).collect();
for device_id in rate_limiter_keys {
self.rate_limiter.remove(&device_id);
}
let plugin_keys: Vec<_> = self
.plugins
.iter()
.map(|entry| entry.key().clone())
.collect();
for plugin_name in plugin_keys {
self.plugins.remove(&plugin_name);
}
}
}
struct SdkMessageHandler {
app_tx: mpsc::Sender<Message>,
_crypto: Arc<CryptoEngine>,
group_manager: std::sync::Weak<GroupManager>,
heartbeat_manager: std::sync::Weak<Mutex<HeartbeatManager>>,
stream_manager: std::sync::Weak<StreamManager>,
rate_limiter: Arc<DashMap<DeviceId, (Instant, u32)>>,
metrics: Arc<crate::core::metrics::MetricsCollector>,
}
#[async_trait]
impl MessageHandler for SdkMessageHandler {
async fn handle_message(&self, mut message: Message) -> Result<()> {
let now = Instant::now();
let should_rate_limit = {
match self.rate_limiter.try_get_mut(&message.sender) {
dashmap::try_result::TryResult::Present(mut rate_entry) => {
let (last_reset, count) = rate_entry.value_mut();
let duration = now.saturating_duration_since(*last_reset);
if duration > Duration::from_secs(1) {
*last_reset = now;
*count = 1;
false
} else {
*count = count.saturating_add(1);
*count > 100
}
}
dashmap::try_result::TryResult::Absent | dashmap::try_result::TryResult::Locked => {
false
}
}
};
if should_rate_limit {
log::warn!(
"DoS Protection: Rate limit exceeded for device {}",
message.sender
);
return Err(crate::core::error::XLinkError::resource_exhausted(
format!("Rate limit exceeded for device {}", message.sender),
101,
100,
file!(),
));
}
log::info!("SDK received message: {}", message.id);
self.metrics.record_receive(0);
match message.payload {
MessagePayload::Ping(_) | MessagePayload::Pong(_) => {
if let Some(hm) = self.heartbeat_manager.upgrade() {
let hb = hm.lock().await;
hb.handle_heartbeat(&message).await;
}
return Ok(()); }
MessagePayload::StreamChunk {
stream_id,
total_chunks,
chunk_index,
data,
..
} => {
if let Some(sm) = self.stream_manager.upgrade() {
match sm
.handle_chunk(stream_id, total_chunks, chunk_index, data)
.await
{
Ok(Some(full_data)) => {
message.payload = MessagePayload::Binary(full_data);
}
Ok(None) => {
return Ok(()); }
Err(e) => {
log::error!("Stream reassembly error: {}", e);
return Ok(());
}
}
} else {
return Ok(());
}
}
MessagePayload::GroupInvite { .. } => {
if let Some(gm) = self.group_manager.upgrade() {
gm.as_ref().handle_incoming_group_message(&message).await?;
}
}
_ => {
}
}
if let Err(e) = self.app_tx.send(message).await {
log::error!("Failed to deliver message to app: {}", e);
}
Ok(())
}
}
impl XLink {
pub async fn new(config: DeviceCapabilities, channels: Vec<Arc<dyn Channel>>) -> Result<Self> {
Self::with_storage_path(config, channels, "storage".to_string()).await
}
pub async fn with_storage_path(
config: DeviceCapabilities,
channels: Vec<Arc<dyn Channel>>,
storage_path: String,
) -> Result<Self> {
let storage = Arc::new(crate::storage::file_store::FileStorage::new(&storage_path).await?);
Self::with_storage(config, channels, storage).await
}
pub async fn with_storage(
config: DeviceCapabilities,
channels: Vec<Arc<dyn Channel>>,
storage: Arc<dyn Storage>,
) -> Result<Self> {
let device_id = config.device_id;
let cap_manager = Arc::new(CapabilityManager::new(config));
let crypto = Arc::new(CryptoEngine::new());
let (app_tx, app_rx) = mpsc::channel(100);
let mut channel_map = HashMap::new();
for ch in channels {
channel_map.insert(ch.channel_type(), ch);
}
let router = Arc::new(Router::new(channel_map, cap_manager.clone()));
let group_manager = Arc::new(GroupManager::new(device_id, router.clone()));
let heartbeat_manager = Arc::new(Mutex::new(HeartbeatManager::new(
device_id,
router.clone(),
cap_manager.clone(),
)));
let discovery_manager = Arc::new(Mutex::new(DiscoveryManager::new(cap_manager.clone())));
let stream_manager = Arc::new(StreamManager::new(device_id, router.clone()));
let cap_detector = Arc::new(Mutex::new(
crate::capability::detector::LocalCapabilityDetector::new(cap_manager.clone()),
));
let rate_limiter = Arc::new(DashMap::new());
let metrics = Arc::new(crate::core::metrics::MetricsCollector::new());
let receive_tasks = Arc::new(DashMap::new());
let background_tasks = Arc::new(DashMap::new());
Ok(Self {
device_id,
router,
cap_manager,
crypto,
storage,
group_manager,
heartbeat_manager,
discovery_manager,
stream_manager,
cap_detector,
rate_limiter,
metrics,
receive_tasks,
background_tasks,
app_rx: Arc::new(Mutex::new(app_rx)),
app_tx,
compliance: Arc::new(crate::core::types::ComplianceConfig::default()),
plugins: Arc::new(DashMap::new()),
})
}
pub async fn start(&self) -> Result<()> {
log::info!("Starting UnifiedPush SDK for device {}", self.device_id);
self.stop().await;
match self.recover_from_crash().await {
Ok(_) => log::info!("Crash recovery completed successfully"),
Err(e) => log::error!("Crash recovery failed: {}", e),
}
let handler = Arc::new(SdkMessageHandler {
app_tx: self.app_tx.clone(),
_crypto: self.crypto.clone(),
group_manager: Arc::downgrade(&self.group_manager),
heartbeat_manager: Arc::downgrade(&self.heartbeat_manager),
stream_manager: Arc::downgrade(&self.stream_manager),
rate_limiter: self.rate_limiter.clone(),
metrics: self.metrics.clone(),
});
for (ctype, channel) in self.router.get_channels() {
let channel = channel.clone();
let ctype = *ctype;
let h = handler.clone();
match channel.start_with_handler(h).await {
Ok(Some(task)) => {
self.receive_tasks.insert(ctype, task);
}
Ok(None) => {
log::debug!("Channel {:?} started without background task", ctype);
}
Err(e) => log::error!("Failed to start channel {:?}: {}", ctype, e),
}
}
if let Some(task) = self.heartbeat_manager.lock().await.start() {
self.background_tasks.insert("heartbeat".to_string(), task);
}
let (mdns_task, ble_task) = self.discovery_manager.lock().await.start_discovery().await;
if let Some(task) = mdns_task {
self.background_tasks
.insert("discovery_mdns".to_string(), task);
}
if let Some(task) = ble_task {
self.background_tasks
.insert("discovery_ble".to_string(), task);
}
let detector = self.cap_detector.clone();
let detector_task = tokio::spawn(async move {
loop {
{
if let Ok(mut d) = detector.try_lock() {
d.detect_and_update();
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
self.background_tasks
.insert("capability_detection".to_string(), detector_task);
let storage = self.storage.clone();
let retention_days = self.compliance.retention_days;
let cleanup_task = tokio::spawn(async move {
loop {
if retention_days > 0 {
match storage.cleanup_old_data(retention_days).await {
Ok(count) => log::info!("Compliance: Cleaned up {} old records", count),
Err(e) => log::error!("Compliance: Cleanup failed: {}", e),
}
}
tokio::time::sleep(Duration::from_secs(24 * 3600)).await; }
});
self.background_tasks
.insert("data_cleanup".to_string(), cleanup_task);
let group_manager = self.group_manager.clone();
let memory_cleanup_task = tokio::spawn(async move {
loop {
group_manager.cleanup_expired_invites(24);
if tokio::time::Instant::now()
.elapsed()
.as_secs()
.is_multiple_of(12 * 3600)
{
group_manager.cleanup_expired_broadcast_results().await;
}
tokio::time::sleep(Duration::from_secs(6 * 3600)).await; }
});
self.background_tasks
.insert("memory_cleanup".to_string(), memory_cleanup_task);
Ok(())
}
pub fn export_sdk_state(&self) -> Result<Vec<u8>> {
let crypto_state = self.crypto.export_state()?;
let serialized = serde_json::to_vec(&crypto_state).map_err(|e| {
crate::core::error::XLinkError::serialization_failed(
"export_sdk_state",
&format!("Failed to serialize SDK state: {}", e),
file!(),
)
})?;
Ok(serialized)
}
pub fn import_sdk_state(&mut self, data: &[u8]) -> Result<()> {
let crypto_state: crate::crypto::engine::CryptoState = serde_json::from_slice(data)
.map_err(|e| {
crate::core::error::XLinkError::serialization_failed(
"import_sdk_state",
&format!("Failed to deserialize SDK state: {}", e),
file!(),
)
})?;
self.crypto = Arc::new(crate::crypto::engine::CryptoEngine::import_state(
crypto_state,
)?);
Ok(())
}
pub async fn simulate_background_discovery(&self, device_id: DeviceId) -> Result<()> {
let discovery = self.discovery_manager.lock().await;
discovery.simulate_background_discovery(device_id).await
}
pub async fn stop(&self) {
log::info!("Stopping UnifiedPush SDK for device {}", self.device_id);
for entry in self.receive_tasks.iter() {
entry.value().abort();
}
self.receive_tasks.clear();
for entry in self.background_tasks.iter() {
entry.value().abort();
}
self.background_tasks.clear();
self.heartbeat_manager.lock().await.stop();
{
let dm = self.discovery_manager.lock().await;
dm.stop_discovery().await;
dm.clear_cache().await;
}
self.crypto.clear_sessions();
self.cap_manager.clear_remote_devices();
self.group_manager.clear_groups();
self.stream_manager.clear_streams();
if let Some(storage) = self
.storage
.as_any()
.downcast_ref::<crate::storage::file_store::FileStorage>()
{
storage.cleanup_indexes();
}
self.rate_limiter.clear();
self.plugins.clear();
self.metrics.clear();
self.router.clear_channels().await;
log::info!("UnifiedPush SDK stopped successfully");
}
pub async fn send(&self, recipient: DeviceId, payload: MessagePayload) -> Result<()> {
log::info!(
"Sending message from {} to {} with payload: {:?}",
self.device_id,
recipient,
payload
);
{
let now = Instant::now();
let mut rate_entry = self.rate_limiter.entry(self.device_id).or_insert((now, 0));
let (last_reset, count) = rate_entry.value_mut();
let duration = now.saturating_duration_since(*last_reset);
if duration >= Duration::from_secs(1) {
*last_reset = now;
*count = 1;
} else {
*count = count.saturating_add(1);
if *count > 100 {
log::warn!(
"DoS Protection: Send rate limit exceeded for device {}",
self.device_id
);
return Err(crate::core::error::XLinkError::resource_exhausted(
format!("Send rate limit exceeded for device {}", self.device_id),
(*count).into(),
100,
file!(),
));
}
}
}
self.metrics.record_send(ChannelType::Internet, 0);
if let MessagePayload::Binary(data) = &payload {
if data.len() > 1024 * 32 {
log::info!("Using stream transmission for large message");
self.stream_manager
.send_video_stream(recipient, data.clone(), None)
.await?;
return Ok(());
}
}
let message = Message::new(self.device_id, recipient, payload);
log::info!("Created message: {}", message.id);
self.storage.save_message(&message).await?;
log::info!("Message saved to storage");
let channel = match self.router.select_channel(&message).await {
Ok(ch) => ch,
Err(e) if e.code().0 == 105 => {
log::warn!(
"No route found for {}, adding default test state",
recipient
);
for ctype in self.router.get_channels().keys() {
let state = crate::core::types::ChannelState {
available: true,
rtt_ms: 50,
jitter_ms: 5,
packet_loss_rate: 0.0,
bandwidth_bps: 10_000_000,
signal_strength: Some(80),
network_type: crate::core::types::NetworkType::WiFi,
failure_count: 0,
last_heartbeat: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_secs(),
distance_meters: Some(10.0), };
self.cap_manager
.update_channel_state(recipient, *ctype, state);
}
self.router.select_channel(&message).await?
}
Err(e) => return Err(e),
};
log::info!("Selected channel: {:?}", channel.channel_type());
match channel.send(message.clone()).await {
Ok(_) => {
log::info!("Message sent successfully");
let bytes = match &message.payload {
MessagePayload::Text(t) => t.len() as u64,
MessagePayload::Binary(b) => b.len() as u64,
_ => 0,
};
self.metrics.record_send(channel.channel_type(), bytes);
self.storage.remove_message(&message.id).await?;
let _ = self.storage.remove_pending_message(&message.id).await;
Ok(())
}
Err(e) => {
log::error!("Failed to send message: {}", e);
if let Err(save_err) = self.storage.save_pending_message(&message).await {
log::error!("Failed to save pending message for recovery: {}", save_err);
} else {
log::info!("Saved message {} to pending queue for recovery", message.id);
}
Err(e)
}
}
}
pub async fn create_group(
&self,
name: String,
members: Vec<DeviceId>,
) -> Result<crate::core::types::GroupId> {
use rand::rngs::OsRng;
use x25519_dalek::StaticSecret;
self.group_manager
.register_device_key(self.device_id, self.crypto.public_key())?;
for member_id in &members {
if *member_id != self.device_id {
let secret = StaticSecret::random_from_rng(OsRng);
let public = PublicKey::from(&secret);
self.group_manager.register_device_key(*member_id, public)?;
}
}
let group = self.group_manager.create_group(name, members).await?;
Ok(group.id)
}
pub async fn send_to_group(
&self,
group_id: crate::core::types::GroupId,
payload: MessagePayload,
) -> Result<()> {
self.group_manager.broadcast(group_id, payload).await?;
Ok(())
}
pub fn register_device_key(&self, device_id: DeviceId, public_key: PublicKey) -> Result<()> {
self.group_manager
.register_device_key(device_id, public_key)
}
pub fn encrypt_group_message(
&self,
group_id: crate::core::types::GroupId,
payload: &MessagePayload,
) -> Result<MessagePayload> {
self.group_manager.encrypt_group_message(group_id, payload)
}
pub fn decrypt_group_message(
&self,
group_id: crate::core::types::GroupId,
encrypted_payload: &MessagePayload,
) -> Result<MessagePayload> {
self.group_manager
.decrypt_group_message(group_id, encrypted_payload)
}
pub async fn rotate_group_key(&self, group_id: crate::core::types::GroupId) -> Result<()> {
self.group_manager.rotate_group_key(group_id).await
}
pub fn router(&self) -> Arc<Router> {
self.router.clone()
}
pub fn group_manager(&self) -> Arc<GroupManager> {
self.group_manager.clone()
}
pub async fn receive(&self) -> Option<Message> {
let mut rx = self.app_rx.lock().await;
rx.recv().await
}
pub fn get_message_handler(&self) -> Arc<dyn MessageHandler> {
Arc::new(SdkMessageHandler {
app_tx: self.app_tx.clone(),
_crypto: self.crypto.clone(),
group_manager: Arc::downgrade(&self.group_manager),
heartbeat_manager: Arc::downgrade(&self.heartbeat_manager),
stream_manager: Arc::downgrade(&self.stream_manager),
rate_limiter: self.rate_limiter.clone(),
metrics: self.metrics.clone(),
})
}
pub fn capability_manager(&self) -> Arc<CapabilityManager> {
self.cap_manager.clone()
}
pub fn device_id(&self) -> DeviceId {
self.device_id
}
pub fn metrics_report(&self) -> crate::core::metrics::MetricsReport {
self.metrics.get_report()
}
pub fn public_key(&self) -> PublicKey {
self.crypto.public_key()
}
pub fn get_compliance_config(&self) -> crate::core::types::ComplianceConfig {
self.compliance.as_ref().clone()
}
pub fn update_compliance_config(&mut self, config: crate::core::types::ComplianceConfig) {
self.compliance = Arc::new(config);
log::info!("Compliance config updated");
}
pub async fn export_audit_logs(&self) -> Result<Vec<String>> {
self.storage.get_audit_logs(100).await
}
#[allow(dead_code)]
async fn log_audit(&self, action: &str) {
let entry = format!(
"[{:?}] Device: {} Action: {}",
std::time::SystemTime::now(),
self.device_id,
action
);
let _ = self.storage.save_audit_log(entry).await;
}
pub fn get_system_metrics(&self) -> crate::core::metrics::MetricsReport {
self.metrics.get_report()
}
pub fn register_plugin(&self, plugin: Arc<dyn crate::core::traits::Plugin>) -> Result<()> {
let name = plugin.name().to_string();
plugin.on_load()?;
self.plugins.insert(name.clone(), plugin);
log::info!("Plugin loaded: {}", name);
Ok(())
}
pub fn unregister_plugin(&self, name: &str) -> Result<()> {
if let Some((_, plugin)) = self.plugins.remove(name) {
plugin.on_unload()?;
log::info!("Plugin unloaded: {}", name);
}
Ok(())
}
pub async fn save_pending_message(
&self,
recipient: DeviceId,
payload: MessagePayload,
) -> Result<()> {
let message = Message::new(self.device_id, recipient, payload);
self.storage.save_pending_message(&message).await?;
log::info!("Saved pending message {} for recovery", message.id);
Ok(())
}
pub async fn recover_pending_messages(&self) -> Result<Vec<Message>> {
let messages = self
.storage
.get_pending_messages_for_recovery(&self.device_id)
.await?;
log::info!("Recovered {} pending messages after crash", messages.len());
Ok(messages)
}
pub async fn get_storage_usage(&self) -> Result<u64> {
self.storage.get_storage_usage().await
}
pub async fn cleanup_storage(&self, target_size_bytes: u64) -> Result<u64> {
let removed = self.storage.cleanup_storage(target_size_bytes).await?;
log::info!("Cleaned up {} bytes of storage", removed);
Ok(removed)
}
pub async fn handle_low_battery_shutdown(&self) -> Result<()> {
log::warn!("Low battery detected, performing graceful shutdown");
let pending_messages = self.recover_pending_messages().await?;
log::info!(
"Saved {} pending messages before shutdown",
pending_messages.len()
);
self.storage
.save_audit_log("Low battery shutdown initiated".to_string())
.await?;
let state_data = self.export_sdk_state()?;
log::info!(
"Exported SDK state ({} bytes) for recovery",
state_data.len()
);
let _ = self.cleanup_storage(1024 * 1024).await;
Ok(())
}
pub async fn recover_from_crash(&self) -> Result<()> {
log::info!("Starting crash recovery process");
let pending_messages = self.recover_pending_messages().await?;
let total_messages = pending_messages.len();
log::info!("Found {} messages to retry after crash", total_messages);
let mut failed_count = 0;
for message in pending_messages {
match self.send(message.recipient, message.payload.clone()).await {
Ok(_) => {
self.storage.remove_pending_message(&message.id).await?;
log::info!("Successfully resent message {} after crash", message.id);
}
Err(e) => {
failed_count += 1;
log::error!("Failed to resend message {} after crash: {}", message.id, e);
}
}
}
log::info!(
"Crash recovery completed: {} messages resent, {} failed",
total_messages - failed_count,
failed_count
);
self.storage
.save_audit_log(format!(
"Crash recovery completed: {} messages processed",
total_messages
))
.await?;
Ok(())
}
}