use anyhow::{Context, Result};
use chrono::Utc;
use reqwest::blocking::Client as BlockingClient;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::path::Path;
use crate::syftbox::app::SyftBoxApp;
use crate::syftbox::endpoint::Endpoint;
use crate::syftbox::storage::WritePolicy;
use crate::syftbox::types::{RpcRequest, RpcResponse};
use syftbox_sdk::{has_syc_magic, parse_envelope};
use super::db::MessageDb;
use super::models::{
DecryptionFailureReason, FailedMessage, Message, MessageStatus, MessageType, SyncStatus,
};
#[derive(Debug, Serialize, Deserialize)]
struct MessagePayload {
message_id: String,
thread_id: Option<String>,
parent_id: Option<String>,
from: String,
to: String,
subject: Option<String>,
body: String,
message_type: String,
metadata: Option<serde_json::Value>,
created_at: String,
}
pub struct MessageSync {
db: MessageDb,
app: SyftBoxApp,
}
#[derive(Debug, Deserialize)]
struct SendHandlerResponse {
request_id: String,
}
impl MessageSync {
pub fn new(db_path: &Path, app: SyftBoxApp) -> Result<Self> {
let db = MessageDb::new(db_path)?;
Ok(Self { db, app })
}
fn send_via_send_handler(&self, from: &str, to: &str, body: &[u8]) -> Result<String> {
let server_url = std::env::var("SYFTBOX_SERVER_URL")
.context("SYFTBOX_SERVER_URL is required to send messages via the server")?;
let server_url = server_url.trim().trim_end_matches('/');
if server_url.is_empty() {
anyhow::bail!("SYFTBOX_SERVER_URL is empty");
}
let target_syft_url = format!("syft://{}/app_data/biovault/rpc/message", to);
let mut url = Url::parse(&format!("{}/api/v1/send/msg", server_url))
.context("Failed to build send handler URL")?;
url.query_pairs_mut()
.append_pair("x-syft-url", &target_syft_url)
.append_pair("x-syft-from", from);
let client = BlockingClient::builder()
.build()
.context("Failed to build HTTP client")?;
let resp = client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body.to_vec())
.send()
.context("Failed to send message via send handler")?;
let status = resp.status();
let resp_text = resp
.text()
.context("Failed to read send handler response body")?;
if status.is_client_error() || status.is_server_error() {
anyhow::bail!("Send handler error {}: {}", status, resp_text);
}
let parsed: SendHandlerResponse = serde_json::from_str(&resp_text)
.with_context(|| format!("Failed to parse send handler response: {}", resp_text))?;
Ok(parsed.request_id)
}
pub fn send_message(&self, message_id: &str) -> Result<()> {
let mut msg = self
.db
.get_message(message_id)?
.ok_or_else(|| anyhow::anyhow!("Message not found: {}", message_id))?;
let payload = MessagePayload {
message_id: msg.id.clone(),
thread_id: msg.thread_id.clone(),
parent_id: msg.parent_id.clone(),
from: msg.from.clone(),
to: msg.to.clone(),
subject: msg.subject.clone(),
body: msg.body.clone(),
message_type: msg.message_type.to_string(),
metadata: msg.metadata.clone(),
created_at: msg.created_at.to_rfc3339(),
};
let payload_bytes = serde_json::to_vec(&payload)?;
msg.sync_status = SyncStatus::Syncing;
msg.status = MessageStatus::Sent;
msg.sent_at = Some(Utc::now());
self.db.update_message(&msg)?;
let mut mode = "send-handler";
let send_result: Result<String> = match std::env::var("SYFTBOX_SERVER_URL") {
Ok(url) if !url.trim().is_empty() => {
self.send_via_send_handler(&msg.from, &msg.to, &payload_bytes)
}
_ => {
mode = "filesystem";
let rpc_request = RpcRequest::new(
msg.from.clone(),
format!("syft://{}/app_data/biovault/rpc/message", msg.to),
"POST".to_string(),
payload_bytes.clone(),
);
let recipient_rpc_dir = self
.app
.data_dir
.join("datasites")
.join(&msg.to)
.join("app_data")
.join("biovault")
.join("rpc")
.join("message");
self.app
.storage
.ensure_dir(&recipient_rpc_dir)
.with_context(|| {
format!("Failed to prepare RPC dir {:?}", recipient_rpc_dir)
})?;
let request_path = recipient_rpc_dir.join(format!("{}.request", rpc_request.id));
let write_policy = WritePolicy::Envelope {
recipients: vec![msg.to.clone()],
hint: Some(format!("message-{}", rpc_request.id)),
};
self.app.storage.write_json_with_shadow(
&request_path,
&rpc_request,
write_policy,
true,
)?;
Ok(rpc_request.id)
}
};
match send_result {
Ok(request_id) => {
msg.rpc_request_id = Some(request_id.clone());
self.db.update_message(&msg)?;
println!("📤 Message sent to {} ({})", msg.to, mode);
Ok(())
}
Err(e) => {
msg.sync_status = SyncStatus::Failed;
let _ = self.db.update_message(&msg);
Err(e)
}
}
}
pub fn check_incoming(&self, no_cleanup: bool) -> Result<Vec<String>> {
let (new_message_ids, _new_failed) = self.check_incoming_with_failures(no_cleanup)?;
Ok(new_message_ids)
}
pub fn check_incoming_with_failures(&self, no_cleanup: bool) -> Result<(Vec<String>, usize)> {
let endpoint = Endpoint::new(&self.app, "/message")?;
let (requests, failures) = endpoint.check_requests_with_failures()?;
let mut new_message_ids = Vec::new();
let mut new_failed_count = 0;
for (request_path, rpc_request) in requests {
let body_bytes = match rpc_request.decode_body() {
Ok(b) => b,
Err(e) => {
eprintln!("Failed to decode request body: {}", e);
continue;
}
};
let payload: MessagePayload = match serde_json::from_slice(&body_bytes) {
Ok(p) => p,
Err(e) => {
eprintln!("Failed to parse message payload: {}", e);
continue;
}
};
let mut msg = Message::new(payload.from.clone(), self.app.email.clone(), payload.body);
msg.id = payload.message_id;
msg.thread_id = payload.thread_id;
msg.parent_id = payload.parent_id;
msg.subject = payload.subject;
msg.message_type = match payload.message_type.as_str() {
"project" => MessageType::Project {
project_name: String::new(),
submission_id: String::new(),
files_hash: None,
},
"request" => MessageType::Request {
request_type: String::new(),
params: None,
},
_ => MessageType::Text,
};
msg.metadata = payload.metadata;
msg.status = MessageStatus::Received;
msg.sync_status = SyncStatus::Synced;
msg.received_at = Some(Utc::now());
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&payload.created_at) {
msg.created_at = dt.with_timezone(&Utc);
}
if self.db.get_message(&msg.id)?.is_none() {
self.db.insert_message(&msg)?;
new_message_ids.push(msg.id.clone());
}
let _ = self.db.delete_failed_message_by_rpc_id(&rpc_request.id);
let ack_response = RpcResponse::new(
&rpc_request,
self.app.email.clone(),
200,
b"Message received".to_vec(),
);
let _ = endpoint.send_response(&request_path, &rpc_request, &ack_response, no_cleanup);
}
for (request_path, error_msg, raw_bytes) in failures {
let rpc_request_id = request_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
if self
.db
.get_failed_message_by_rpc_id(&rpc_request_id)?
.is_some()
{
continue;
}
eprintln!(
"⚠️ Failed to decrypt/parse incoming message (rpc_id={}): {}",
rpc_request_id, error_msg
);
let (
sender_identity,
sender_fingerprint,
recipient_fingerprint,
filename_hint,
failure_reason,
) = self.extract_envelope_metadata(&raw_bytes, &error_msg);
let mut failed = FailedMessage::new(
request_path.to_string_lossy().to_string(),
rpc_request_id,
sender_identity,
sender_fingerprint,
failure_reason,
error_msg,
);
failed.recipient_identity = Some(self.app.email.clone());
failed.recipient_fingerprint = recipient_fingerprint;
failed.filename_hint = filename_hint;
self.db.insert_failed_message(&failed)?;
new_failed_count += 1;
}
Ok((new_message_ids, new_failed_count))
}
fn extract_envelope_metadata(
&self,
raw_bytes: &[u8],
error_msg: &str,
) -> (
String,
String,
Option<String>,
Option<String>,
DecryptionFailureReason,
) {
let mut sender_identity = "unknown".to_string();
let mut sender_fingerprint = "unknown".to_string();
let mut recipient_fingerprint = None;
let mut filename_hint = None;
let failure_reason = if error_msg.contains("sender bundle not cached")
|| error_msg.contains("no cached bundle")
{
DecryptionFailureReason::SenderBundleNotCached
} else if error_msg.contains("recipient")
|| error_msg.contains("not addressed")
|| error_msg.contains("wrong recipient")
{
DecryptionFailureReason::WrongRecipient
} else if error_msg.contains("decrypt") || error_msg.contains("cipher") {
DecryptionFailureReason::DecryptionFailed
} else if error_msg.contains("fingerprint") || error_msg.contains("key mismatch") {
DecryptionFailureReason::RecipientKeyMismatch
} else {
DecryptionFailureReason::Other(error_msg.to_string())
};
if has_syc_magic(raw_bytes) {
if let Ok(parsed) = parse_envelope(raw_bytes) {
sender_identity = parsed.prelude.sender.identity.clone();
sender_fingerprint = parsed.prelude.sender.ik_fingerprint.clone();
if let Some(wrapping) = parsed.prelude.wrappings.first() {
if let Some(ref ri) = wrapping.recipient_identity {
let _ = ri;
}
}
if let Some(recipient) = parsed.prelude.recipients.first() {
if let Some(ref spk_fp) = recipient.spk_fingerprint {
recipient_fingerprint = Some(spk_fp.clone());
}
}
if let Some(ref meta) = parsed.prelude.public_meta {
filename_hint = meta.filename_hint.clone();
}
}
}
(
sender_identity,
sender_fingerprint,
recipient_fingerprint,
filename_hint,
failure_reason,
)
}
pub fn check_acks(&self, no_cleanup: bool) -> Result<()> {
let endpoint = Endpoint::new(&self.app, "/message")?;
let responses = endpoint.check_responses()?;
for (response_path, rpc_response) in responses {
let request_id = &rpc_response.id;
let messages = self.db.list_messages(None)?;
for mut msg in messages {
if msg.rpc_request_id.as_ref() == Some(request_id) {
msg.rpc_ack_status = Some(rpc_response.status_code as i32);
msg.rpc_ack_at = Some(Utc::now());
msg.sync_status = if rpc_response.status_code == 200 {
SyncStatus::Synced
} else {
SyncStatus::Failed
};
self.db.update_message(&msg)?;
break;
}
}
if !no_cleanup {
endpoint.cleanup_response(&response_path)?;
}
}
Ok(())
}
pub fn sync(&self, no_cleanup: bool) -> Result<()> {
let (new_message_ids, new_failed_count) = self.check_incoming_with_failures(no_cleanup)?;
if !new_message_ids.is_empty() {
println!("📬 {} new message(s) received", new_message_ids.len());
}
if new_failed_count > 0 {
eprintln!(
"⚠️ Recorded {} message decryption failure(s) (see failed_messages table).",
new_failed_count
);
}
self.check_acks(no_cleanup)?;
Ok(())
}
pub fn sync_quiet(&self) -> Result<(Vec<String>, usize)> {
let (new_messages, _count, _failed_count) = self.sync_quiet_with_failures()?;
Ok((new_messages.clone(), new_messages.len()))
}
pub fn sync_quiet_with_failures(&self) -> Result<(Vec<String>, usize, usize)> {
let (new_messages, failed_count) = self.check_incoming_with_failures(false)?;
self.check_acks(false)?;
Ok((new_messages.clone(), new_messages.len(), failed_count))
}
pub fn list_failed_messages(&self, include_dismissed: bool) -> Result<Vec<FailedMessage>> {
self.db.list_failed_messages(include_dismissed)
}
pub fn count_failed_messages(&self) -> Result<usize> {
self.db.count_failed_messages()
}
pub fn dismiss_failed_message(&self, id: &str) -> Result<bool> {
self.db.dismiss_failed_message(id)
}
pub fn delete_failed_message(&self, id: &str) -> Result<bool> {
self.db.delete_failed_message(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::messages::models::Message;
use crate::syftbox::storage::SyftBoxStorage;
use tempfile::TempDir;
fn list_response_files(
storage: &SyftBoxStorage,
dir: &std::path::Path,
) -> Vec<std::path::PathBuf> {
storage
.list_dir(dir)
.unwrap_or_default()
.into_iter()
.filter(|p| p.extension().and_then(|s| s.to_str()) == Some("response"))
.collect()
}
#[test]
fn send_receive_and_ack_flow_via_fs() {
let tmp = TempDir::new().unwrap();
let data_dir = tmp.path();
let app_sender = SyftBoxApp::new(data_dir, "alice@example.com", "biovault").unwrap();
let app_recipient = SyftBoxApp::new(data_dir, "bob@example.com", "biovault").unwrap();
let db_sender = tmp.path().join("sender.sqlite");
let db_recipient = tmp.path().join("recipient.sqlite");
let ms_sender = MessageSync::new(&db_sender, app_sender.clone()).unwrap();
let ms_recipient = MessageSync::new(&db_recipient, app_recipient.clone()).unwrap();
let m = Message::new(
"alice@example.com".into(),
"bob@example.com".into(),
"hello".into(),
);
ms_sender.db.insert_message(&m).unwrap();
ms_sender.send_message(&m.id).unwrap();
let new_msgs = ms_recipient.check_incoming(false).unwrap();
assert_eq!(new_msgs.len(), 1);
let recipient_ep = app_recipient.endpoint_path("/message");
let sender_ep = app_sender.endpoint_path("/message");
app_sender.storage.ensure_dir(&sender_ep).unwrap();
for resp in list_response_files(&app_recipient.storage, &recipient_ep) {
let file_name = resp.file_name().unwrap();
let dest = sender_ep.join(file_name);
app_sender.storage.copy_raw_file(&resp, &dest).unwrap();
}
ms_sender.check_acks(false).unwrap();
}
#[test]
fn ack_failure_sets_failed_status() {
let tmp = TempDir::new().unwrap();
let data_dir = tmp.path();
let app_sender = SyftBoxApp::new(data_dir, "alice@example.com", "biovault").unwrap();
let db_sender = tmp.path().join("sender.sqlite");
let ms_sender = MessageSync::new(&db_sender, app_sender.clone()).unwrap();
let m = Message::new(
"alice@example.com".into(),
"bob@example.com".into(),
"hello".into(),
);
ms_sender.db.insert_message(&m).unwrap();
ms_sender.send_message(&m.id).unwrap();
let endpoint = Endpoint::new(&app_sender, "/message").unwrap();
let req_id = ms_sender
.db
.get_message(&m.id)
.unwrap()
.unwrap()
.rpc_request_id
.unwrap();
let dummy_req = RpcRequest::new(
"x".into(),
app_sender.build_syft_url("/message"),
"POST".into(),
b"{}".to_vec(),
);
let mut resp = RpcResponse::new(&dummy_req, "bob@example.com".into(), 500, b"err".to_vec());
resp.id = req_id.clone();
let resp_path = endpoint.path.join(format!("{}.response", req_id));
app_sender
.storage
.write_plaintext_file(
&resp_path,
serde_json::to_string_pretty(&resp).unwrap().as_bytes(),
true,
)
.unwrap();
ms_sender.check_acks(false).unwrap();
let updated = ms_sender.db.get_message(&m.id).unwrap().unwrap();
assert_eq!(updated.sync_status, SyncStatus::Failed);
assert!(updated.rpc_ack_at.is_some());
}
}