obscura-server 0.9.0

A server for relaying secure messages using the Signal Protocol
Documentation
use crate::adapters::database::DbPool;
use crate::adapters::database::message_repo::MessageRepository;
use crate::config::MessagingConfig;
use crate::domain::message::{FailedSubmission, Message, RawSubmission, SubmissionErrorCode, SubmissionOutcome};
use crate::domain::notification::UserEvent;
use crate::error::Result;
use crate::services::notification_service::NotificationService;
use opentelemetry::{
    KeyValue, global,
    metrics::{Counter, Histogram},
};
use uuid::Uuid;

#[derive(Clone, Debug)]
pub(crate) struct Metrics {
    pub(crate) sent_total: Counter<u64>,
    pub(crate) fetch_batch_size: Histogram<u64>,
}

impl Metrics {
    fn new() -> Self {
        let meter = global::meter("obscura-server");
        Self {
            sent_total: meter
                .u64_counter("obscura_messages_sent_total")
                .with_description("Total messages successfully sent")
                .build(),
            fetch_batch_size: meter
                .u64_histogram("obscura_message_fetch_batch_size")
                .with_description("Number of messages fetched in a single batch")
                .build(),
        }
    }
}

#[derive(Clone, Debug)]
pub(crate) struct MessageService {
    pool: DbPool,
    repo: MessageRepository,
    notifier: NotificationService,
    ttl_days: i64,
    metrics: Metrics,
}

impl MessageService {
    #[must_use]
    pub(crate) fn new(
        pool: DbPool,
        repo: MessageRepository,
        notifier: NotificationService,
        _config: MessagingConfig,
        ttl_days: i64,
    ) -> Self {
        Self { pool, repo, notifier, ttl_days, metrics: Metrics::new() }
    }

    /// Processes a batch of raw submissions.
    /// Performs structural validation, device checking, and bulk insertion.
    ///
    /// # Errors
    /// Returns `AppError::Database` if any database operation fails.
    #[tracing::instrument(
        err(level = "warn"),
        skip(self, submissions),
        fields(sender_id = %sender_id, count = submissions.len())
    )]
    pub(crate) async fn send(&self, sender_id: Uuid, submissions: Vec<RawSubmission>) -> Result<SubmissionOutcome> {
        let mut failed_submissions = Vec::new();
        let mut potential_valid = Vec::with_capacity(submissions.len());
        let mut device_ids_to_check = std::collections::HashSet::new();

        // Pass 1: Structural Validation
        for raw in submissions {
            let Ok(submission_id) = Uuid::from_slice(&raw.submission_id) else {
                failed_submissions.push(FailedSubmission {
                    submission_id: raw.submission_id,
                    error_code: SubmissionErrorCode::MalformedSubmissionId,
                    error_message: "Invalid submission_id UUID bytes (expected 16)".to_string(),
                });
                continue;
            };

            let Ok(device_id) = Uuid::from_slice(&raw.device_id) else {
                failed_submissions.push(FailedSubmission {
                    submission_id: raw.submission_id,
                    error_code: SubmissionErrorCode::MalformedDeviceId,
                    error_message: "Invalid device_id UUID bytes (expected 16)".to_string(),
                });
                continue;
            };

            if raw.message.is_empty() {
                failed_submissions.push(FailedSubmission {
                    submission_id: raw.submission_id,
                    error_code: SubmissionErrorCode::MessageMissing,
                    error_message: "Missing message payload".to_string(),
                });
                continue;
            }

            device_ids_to_check.insert(device_id);
            potential_valid.push((device_id, submission_id, raw.message));
        }

        if potential_valid.is_empty() {
            return Ok(SubmissionOutcome { failed_submissions });
        }

        // Pass 2: Business Validation (Device Existence)
        let mut tx = self.pool.begin().await?;
        let check_ids: Vec<Uuid> = device_ids_to_check.into_iter().collect();
        let valid_devices_set: std::collections::HashSet<Uuid> =
            self.repo.check_devices_exist(&mut tx, &check_ids).await?.into_iter().collect();

        let mut to_insert = Vec::with_capacity(potential_valid.len());
        for (d_id, s_id, msg) in potential_valid {
            if valid_devices_set.contains(&d_id) {
                to_insert.push((d_id, s_id, msg));
            } else {
                failed_submissions.push(FailedSubmission {
                    submission_id: s_id.as_bytes().to_vec(),
                    error_code: SubmissionErrorCode::InvalidDevice,
                    error_message: "Device not found".to_string(),
                });
            }
        }

        // Pass 3: Bulk Insert
        if !to_insert.is_empty() {
            let inserted = self.repo.create_batch(&mut tx, sender_id, to_insert, self.ttl_days).await?;
            tx.commit().await?;

            self.metrics.sent_total.add(inserted.len() as u64, &[KeyValue::new("status", "success")]);

            // Notify target devices
            let inserted_device_ids: Vec<Uuid> = inserted.into_iter().map(|(id, _)| id).collect();
            self.notifier.notify(&inserted_device_ids, UserEvent::MessageReceived).await;
        }

        Ok(SubmissionOutcome { failed_submissions })
    }

    /// Fetches a batch of pending messages for a device.
    ///
    /// # Errors
    /// Returns `AppError::Database` if the query fails.
    #[tracing::instrument(
        err(level = "warn"),
        skip(self),
        fields(device.id = %device_id, batch_limit = %limit)
    )]
    pub(crate) async fn fetch_pending_batch(
        &self,
        device_id: Uuid,
        cursor: Option<(time::OffsetDateTime, Uuid)>,
        limit: i64,
    ) -> Result<Vec<Message>> {
        let mut conn = self.pool.acquire().await?;
        let messages = self.repo.fetch_pending_batch(&mut conn, device_id, cursor, limit).await?;

        self.metrics.fetch_batch_size.record(messages.len() as u64, &[]);

        Ok(messages)
    }

    /// Deletes a batch of messages.
    ///
    /// # Errors
    /// Returns `AppError::Database` if the deletion fails.
    #[tracing::instrument(
        err,
        skip(self),
        fields(batch_count = message_ids.len())
    )]
    pub(crate) async fn delete_batch(&self, device_id: Uuid, message_ids: &[Uuid]) -> Result<()> {
        let mut conn = self.pool.acquire().await?;
        self.repo.delete_batch(&mut conn, device_id, message_ids).await
    }
}