rustpatcher 0.2.2

distributed patching system for single binary applications
Documentation
use actor_helper::{Action, Actor, Handle, Receiver, act_ok};
use distributed_topic_tracker::{RecordPublisher, unix_minute};
use iroh::EndpointId;
use tracing::{debug, error, warn};

use crate::{Patch, PatchInfo, Version};

#[derive(Debug, Clone)]
pub struct Publisher {
    api: Handle<PublisherActor, anyhow::Error>,
}

#[derive(Debug, Clone)]
pub enum PublisherState {
    Publishing,
    NewerAvailable,
}

#[derive(Debug)]
struct PublisherActor {
    rx: Receiver<Action<PublisherActor>>,
    state: PublisherState,

    interval: tokio::time::Interval,
    self_patch: Patch,
    record_publisher: RecordPublisher,

    update_starter: tokio::sync::mpsc::Sender<()>,
}

impl Publisher {
    pub fn new(
        record_publisher: RecordPublisher,
        update_starter: tokio::sync::mpsc::Sender<()>,
    ) -> anyhow::Result<Self> {
        let self_patch = Patch::from_self()?;
        let (api, rx) = Handle::channel();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(55));
            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            let mut actor = PublisherActor {
                rx,
                state: PublisherState::Publishing,
                interval,
                self_patch,
                record_publisher,
                update_starter,
            };
            if let Err(e) = actor.run().await {
                error!("VersionPublisher actor error: {:?}", e);
            }
        });
        Ok(Self { api })
    }

    pub async fn get_record_publisher(&self) -> anyhow::Result<RecordPublisher> {
        self.api
            .call(act_ok!(actor => async move {
                actor.record_publisher.clone()
            }))
            .await
    }
}

impl Actor<anyhow::Error> for PublisherActor {
    async fn run(&mut self) -> anyhow::Result<()> {
        loop {
            tokio::select! {
                Ok(action) = self.rx.recv_async() => {
                    action(self).await
                }
                _ = self.interval.tick(), if matches!(self.state, PublisherState::Publishing) => {
                    let now = unix_minute(0);
                    let mut records = self.record_publisher.get_records(now).await;
                    records.extend(self.record_publisher.get_records(now-1).await);

                    warn!("Checked for records, found {} records", records.len());
                    let c_version = Version::current()?;
                    let newer_patch_infos = records
                        .iter()
                        .filter_map(|r| if let Ok(patch_info) = r.content::<PatchInfo>(){
                            if let Ok(endpoint_id) = EndpointId::from_bytes(&r.node_id()) {
                                warn!("Found patch info: {:?}{:?}", endpoint_id,patch_info);
                                Some((endpoint_id,patch_info.clone()))
                            } else {
                                None
                            }
                        } else {
                            None
                        })
                        .filter(|(_,p)| p.version > c_version)
                        .collect::<Vec<(EndpointId, PatchInfo)>>();

                    warn!("Checked for updates, found {} newer versions", newer_patch_infos.len());
                    if newer_patch_infos.is_empty() {
                        let res = self.publish_self(now).await;
                        debug!("Published self: {:?}", res);
                        continue;
                    }
                    self.state = PublisherState::NewerAvailable;
                    let _ = self.update_starter.send(()).await;
                }
            }
        }
    }
}

impl PublisherActor {
    async fn publish_self(&mut self, unix_minute: u64) -> anyhow::Result<()> {
        let record = self
            .record_publisher
            .new_record(unix_minute, self.self_patch.info().clone())?;
        self.record_publisher.publish_record(record).await
    }
}