use std::{str::FromStr, sync::Mutex};
use actor_helper::{Action, Actor, Handle, Receiver};
use distributed_topic_tracker::{RecordPublisher, RecordTopic};
use ed25519_dalek::SigningKey;
use iroh::{Endpoint, protocol::Router};
use once_cell::sync::OnceCell;
use sha2::Digest;
use tracing::{error, warn};
use crate::{Distributor, Publisher, Updater, UpdaterMode};
static PATCHER: OnceCell<Mutex<Option<Patcher>>> = OnceCell::new();
pub async fn spawn(update_mode: UpdaterMode) -> anyhow::Result<()> {
#[cfg(not(debug_assertions))]
if PATCHER.get().is_none() {
let patcher = Patcher::builder().updater_mode(update_mode).build().await?;
let _ = PATCHER.set(Mutex::new(Some(patcher)));
}
#[cfg(debug_assertions)]
if PATCHER.get().is_none() {
let _ = update_mode;
warn!("Skipping rustpatcher initialization in debug build");
let _ = PATCHER.set(Mutex::new(None));
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct Builder {
updater_mode: UpdaterMode,
}
impl Default for Builder {
fn default() -> Self {
Self {
updater_mode: UpdaterMode::Now,
}
}
}
impl Builder {
#[cfg_attr(debug_assertions, allow(dead_code))]
pub fn updater_mode(mut self, mode: UpdaterMode) -> Self {
self.updater_mode = mode;
self
}
#[cfg_attr(debug_assertions, allow(dead_code))]
pub async fn build(self) -> anyhow::Result<Patcher> {
let secret_key = iroh::SecretKey::generate(&mut rand::rng());
let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());
let topic_id = RecordTopic::from_str(
format!(
"rustpatcher:{}",
z32::encode(crate::embed::get_owner_pub_key().as_bytes())
)
.as_str(),
)?;
let mut hash = sha2::Sha512::new();
hash.update(topic_id.hash());
hash.update("v1");
let initial_secret = hash.finalize().to_vec();
let record_publisher = RecordPublisher::new(
topic_id,
signing_key.verifying_key(),
signing_key,
None,
initial_secret,
);
let (update_starter, update_receiver) = tokio::sync::mpsc::channel(1);
let publisher = Publisher::new(record_publisher, update_starter)?;
let endpoint = Endpoint::builder()
.secret_key(secret_key.clone())
.bind()
.await?;
let distributor = Distributor::new(endpoint.clone())?;
let _router = iroh::protocol::Router::builder(endpoint.clone())
.accept(Distributor::ALPN(), distributor.clone())
.spawn();
Ok(Patcher::new(
publisher,
self.updater_mode,
update_receiver,
distributor,
endpoint,
_router,
))
}
}
#[derive(Debug, Clone)]
pub struct Patcher {
_api: Handle<PatcherActor, anyhow::Error>,
}
#[derive(Debug)]
struct PatcherActor {
rx: Receiver<Action<PatcherActor>>,
publisher: Publisher,
updater: Option<Updater>,
updater_mode: UpdaterMode,
distributor: Distributor,
_endpoint: Endpoint,
_router: Router,
update_receiver: tokio::sync::mpsc::Receiver<()>,
}
impl Patcher {
#[cfg_attr(debug_assertions, allow(dead_code))]
pub fn builder() -> Builder {
Builder::default()
}
fn new(
publisher: Publisher,
updater_mode: UpdaterMode,
update_receiver: tokio::sync::mpsc::Receiver<()>,
distributor: Distributor,
endpoint: Endpoint,
router: Router,
) -> Self {
let (api, rx) = Handle::channel();
tokio::spawn(async move {
let mut actor = PatcherActor {
rx,
publisher,
updater: None,
_endpoint: endpoint,
_router: router,
updater_mode,
update_receiver,
distributor,
};
if let Err(e) = actor.run().await {
error!("Patcher actor error: {:?}", e);
}
});
Self { _api: api }
}
}
impl Actor<anyhow::Error> for PatcherActor {
async fn run(&mut self) -> anyhow::Result<()> {
loop {
tokio::select! {
Ok(action) = self.rx.recv_async() => {
action(self).await
}
Some(_) = self.update_receiver.recv(), if self.updater.is_none() => {
warn!("update notification received from Publisher, starting Updater");
if let Ok(record_publisher) = self.publisher.get_record_publisher().await {
self.updater = Some(Updater::new(self.updater_mode.clone(),self.distributor.clone(),record_publisher));
} else {
anyhow::bail!("Failed to get RecordPublisher for Updater");
}
}
}
}
}
}