rustpatcher 0.2.2

distributed patching system for single binary applications
Documentation
use std::{str::FromStr, sync::Mutex};

use actor_helper::{Action, Actor, Handle, Receiver};
use distributed_topic_tracker::{RecordPublisher, RecordTopic};
use ed25519_dalek::SigningKey;
use iroh::{Endpoint, protocol::Router};
use once_cell::sync::OnceCell;
use sha2::Digest;
use tracing::{error, warn};

use crate::{Distributor, Publisher, Updater, UpdaterMode};

static PATCHER: OnceCell<Mutex<Option<Patcher>>> = OnceCell::new();

pub async fn spawn(update_mode: UpdaterMode) -> anyhow::Result<()> {
    #[cfg(not(debug_assertions))]
    if PATCHER.get().is_none() {
        let patcher = Patcher::builder().updater_mode(update_mode).build().await?;
        let _ = PATCHER.set(Mutex::new(Some(patcher)));
    }

    #[cfg(debug_assertions)]
    if PATCHER.get().is_none() {
        let _ = update_mode;
        warn!("Skipping rustpatcher initialization in debug build");
        let _ = PATCHER.set(Mutex::new(None));
    }
    Ok(())
}

#[derive(Debug, Clone)]
pub struct Builder {
    updater_mode: UpdaterMode,
}

impl Default for Builder {
    fn default() -> Self {
        Self {
            updater_mode: UpdaterMode::Now,
        }
    }
}

impl Builder {
    #[cfg_attr(debug_assertions, allow(dead_code))]
    pub fn updater_mode(mut self, mode: UpdaterMode) -> Self {
        self.updater_mode = mode;
        self
    }

    #[cfg_attr(debug_assertions, allow(dead_code))]
    pub async fn build(self) -> anyhow::Result<Patcher> {
        let secret_key = iroh::SecretKey::generate(&mut rand::rng());
        let signing_key = SigningKey::from_bytes(&secret_key.to_bytes());

        let topic_id = RecordTopic::from_str(
            format!(
                "rustpatcher:{}",
                z32::encode(crate::embed::get_owner_pub_key().as_bytes())
            )
            .as_str(),
        )?;
        let mut hash = sha2::Sha512::new();
        hash.update(topic_id.hash());
        hash.update("v1");
        let initial_secret = hash.finalize().to_vec();

        let record_publisher = RecordPublisher::new(
            topic_id,
            signing_key.verifying_key(),
            signing_key,
            None,
            initial_secret,
        );

        let (update_starter, update_receiver) = tokio::sync::mpsc::channel(1);
        let publisher = Publisher::new(record_publisher, update_starter)?;

        let endpoint = Endpoint::builder()
            .secret_key(secret_key.clone())
            .bind()
            .await?;

        let distributor = Distributor::new(endpoint.clone())?;

        let _router = iroh::protocol::Router::builder(endpoint.clone())
            .accept(Distributor::ALPN(), distributor.clone())
            .spawn();

        Ok(Patcher::new(
            publisher,
            self.updater_mode,
            update_receiver,
            distributor,
            endpoint,
            _router,
        ))
    }
}

#[derive(Debug, Clone)]
pub struct Patcher {
    _api: Handle<PatcherActor, anyhow::Error>,
}

#[derive(Debug)]
struct PatcherActor {
    rx: Receiver<Action<PatcherActor>>,

    publisher: Publisher,
    updater: Option<Updater>,
    updater_mode: UpdaterMode,
    distributor: Distributor,

    _endpoint: Endpoint,
    _router: Router,

    update_receiver: tokio::sync::mpsc::Receiver<()>,
}

impl Patcher {
    #[cfg_attr(debug_assertions, allow(dead_code))]
    pub fn builder() -> Builder {
        Builder::default()
    }

    fn new(
        publisher: Publisher,
        updater_mode: UpdaterMode,
        update_receiver: tokio::sync::mpsc::Receiver<()>,
        distributor: Distributor,
        endpoint: Endpoint,
        router: Router,
    ) -> Self {
        let (api, rx) = Handle::channel();
        tokio::spawn(async move {
            let mut actor = PatcherActor {
                rx,
                publisher,
                updater: None,
                _endpoint: endpoint,
                _router: router,
                updater_mode,
                update_receiver,
                distributor,
            };
            if let Err(e) = actor.run().await {
                error!("Patcher actor error: {:?}", e);
            }
        });

        Self { _api: api }
    }
}

impl Actor<anyhow::Error> for PatcherActor {
    async fn run(&mut self) -> anyhow::Result<()> {
        loop {
            tokio::select! {
                Ok(action) = self.rx.recv_async() => {
                    action(self).await
                }
                Some(_) = self.update_receiver.recv(), if self.updater.is_none() => {
                    warn!("update notification received from Publisher, starting Updater");
                    if let Ok(record_publisher) = self.publisher.get_record_publisher().await {
                        self.updater = Some(Updater::new(self.updater_mode.clone(),self.distributor.clone(),record_publisher));
                    } else {
                        anyhow::bail!("Failed to get RecordPublisher for Updater");
                    }
                }
            }
        }
    }
}