use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use serde::{Deserialize, Serialize};
use wasm_bindgen::JsValue;
use web_sys::RtcDataChannel;
use crate::sharedfs_reconcile::FileMeta;
use super::shared_fs;
use super::webrtc::Peer;
fn content_hash(bytes: &[u8]) -> Vec<u8> {
use sha3::{Digest, Keccak256};
Keccak256::digest(bytes).to_vec()
}
async fn local_manifest() -> Vec<FileMeta> {
let mut out = Vec::new();
for entry in shared_fs::apex_list().await {
if let Ok(Some(plain)) = shared_fs::apex_read(&entry.name).await {
out.push(FileMeta::new(entry.name, content_hash(&plain)));
}
}
out
}
type Tx = Rc<RefCell<Option<RtcDataChannel>>>;
#[derive(Clone)]
struct Inbox {
queue: Rc<RefCell<VecDeque<SyncMsg>>>,
draining: Rc<RefCell<bool>>,
}
impl Inbox {
fn new() -> Self {
Self {
queue: Rc::new(RefCell::new(VecDeque::new())),
draining: Rc::new(RefCell::new(false)),
}
}
}
#[derive(Serialize, Deserialize)]
enum SyncMsg {
Manifest(Vec<(String, Vec<u8>)>),
Want(String),
File { name: String, data: Vec<u8> },
}
fn send_msg(tx: &Tx, msg: &SyncMsg) {
let Ok(bytes) = serde_json::to_vec(msg) else {
return;
};
if let Some(ch) = tx.borrow().as_ref() {
let _ = ch.send_with_u8_array(&bytes);
}
}
fn handle_message(bytes: Vec<u8>, tx: Tx, inbox: Inbox) {
let Ok(msg) = serde_json::from_slice::<SyncMsg>(&bytes) else {
return;
};
inbox.queue.borrow_mut().push_back(msg);
drain(tx, inbox);
}
fn drain(tx: Tx, inbox: Inbox) {
if *inbox.draining.borrow() {
return;
}
*inbox.draining.borrow_mut() = true;
wasm_bindgen_futures::spawn_local(async move {
loop {
let Some(msg) = inbox.queue.borrow_mut().pop_front() else {
break;
};
process_message(msg, &tx).await;
}
*inbox.draining.borrow_mut() = false;
});
}
async fn process_message(msg: SyncMsg, tx: &Tx) {
match msg {
SyncMsg::Manifest(remote) => {
let local = local_manifest().await;
let remote: Vec<FileMeta> = remote
.into_iter()
.map(|(name, hash)| FileMeta::new(name, hash))
.collect();
let plan = crate::sharedfs_reconcile::plan_pulls(&local, &remote);
for (from, to) in &plan.rename_local {
if let Ok(Some(plain)) = shared_fs::apex_read(from).await {
let _ = shared_fs::apex_write(to, &plain).await;
}
}
for name in plan.want {
send_msg(tx, &SyncMsg::Want(name));
}
}
SyncMsg::Want(name) => {
if let Ok(Some(data)) = shared_fs::apex_read(&name).await {
send_msg(tx, &SyncMsg::File { name, data });
}
}
SyncMsg::File { name, data } => {
let _ = shared_fs::apex_write(&name, &data).await;
}
}
}
pub(crate) struct SharedFsSync {
peer: Peer,
tx: Tx,
}
impl SharedFsSync {
pub(crate) async fn offer() -> Result<(Self, String), JsValue> {
let tx: Tx = Rc::new(RefCell::new(None));
let tx_cb = tx.clone();
let inbox = Inbox::new();
let (peer, sdp) =
Peer::offer(move |bytes| handle_message(bytes, tx_cb.clone(), inbox.clone())).await?;
*tx.borrow_mut() = Some(peer.sender());
Ok((Self { peer, tx }, sdp))
}
pub(crate) async fn answer(offer_sdp: &str) -> Result<(Self, String), JsValue> {
let tx: Tx = Rc::new(RefCell::new(None));
let tx_cb = tx.clone();
let inbox = Inbox::new();
let (peer, sdp) = Peer::answer(offer_sdp, move |bytes| {
handle_message(bytes, tx_cb.clone(), inbox.clone())
})
.await?;
*tx.borrow_mut() = Some(peer.sender());
Ok((Self { peer, tx }, sdp))
}
pub(crate) async fn accept_answer(&self, answer_sdp: &str) -> Result<(), JsValue> {
self.peer.accept_answer(answer_sdp).await
}
pub(crate) async fn start(&self) {
let manifest: Vec<(String, Vec<u8>)> = local_manifest()
.await
.into_iter()
.map(|f| (f.name, f.hash))
.collect();
send_msg(&self.tx, &SyncMsg::Manifest(manifest));
}
pub(crate) fn is_open(&self) -> bool {
self.peer.is_open()
}
}