signer-daemon 0.2.5

Signer daemon package.
Documentation
use std::{collections::HashMap, fs};

use anyhow::Context as _;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use signer_core::{SignerSigned, SignerUser, SignerUserPublic};

use crate::{
  entity::{crdt_event, server, user},
  model::viewobject::{CrdtCryptedEventVO, CrdtEventVO, UserVO},
  reqwest_auth::ReqwestAuth,
  signer_daemon::signer_daemon_event::SignerDaemonEvent,
};

use super::SignerDaemon;

impl SignerDaemon {
  pub async fn crdt_insert_events(
    &self,
    events: Vec<CrdtEventVO>,
  ) -> anyhow::Result<()> {
    let m = events
      .into_iter()
      .map(|i| crdt_event::ActiveModel::from(crdt_event::Model::from(i.into())))
      .collect::<Vec<crdt_event::ActiveModel>>();

    crdt_event::Entity::insert_many(m).exec(&self.db).await?;

    Ok(())
  }

  pub async fn crdt_apply_all(&self) -> anyhow::Result<()> {
    let anchor = crdt_event::Entity::find()
      .filter(crdt_event::Column::Revert.is_null())
      .order_by_asc(crdt_event::Column::Clock)
      .order_by_asc(crdt_event::Column::Peer)
      .one(&self.db)
      .await
      .context("find anchor failed")?;

    if let Some(anchor) = &anchor {
      let revert_vec = crdt_event::Entity::find()
        .filter(
          crdt_event::Column::Revert.is_not_null().and(
            crdt_event::Column::Clock.gt(anchor.clock).or(
              crdt_event::Column::Clock
                .eq(anchor.clock)
                .and(crdt_event::Column::Peer.gt(&anchor.peer)),
            ),
          ),
        )
        .order_by_desc(crdt_event::Column::Clock)
        .order_by_desc(crdt_event::Column::Peer)
        .all(&self.db)
        .await
        .context("find revert events failed")?;
      for revert in revert_vec {
        revert
          .revert(&self)
          .await
          .context(format!("apply event {:?} failed", revert))?;
      }
    }

    let apply_vec = match anchor {
      Some(anchor) => crdt_event::Entity::find()
        .filter(
          crdt_event::Column::Clock.gt(anchor.clock).or(
            crdt_event::Column::Clock
              .eq(anchor.clock)
              .and(crdt_event::Column::Peer.gte(anchor.peer)),
          ),
        )
        .order_by_asc(crdt_event::Column::Clock)
        .order_by_asc(crdt_event::Column::Peer)
        .all(&self.db)
        .await
        .context("find apply events failed")?,
      None => crdt_event::Entity::find()
        .order_by_asc(crdt_event::Column::Clock)
        .order_by_asc(crdt_event::Column::Peer)
        .all(&self.db)
        .await
        .context("find apply events failed")?,
    };

    for apply in apply_vec {
      apply
        .apply(&self)
        .await
        .context(format!("apply event {:?} failed", apply))?;
    }

    self.emit(SignerDaemonEvent::SyncUser)?;

    Ok(())
  }

  pub async fn crdt_get_frontiers(
    &self,
  ) -> anyhow::Result<HashMap<String, i32>> {
    let ce_vec: Vec<(i32, String)> = crdt_event::Entity::find()
      .select_only()
      .column_as(crdt_event::Column::Clock.max(), "clock")
      .column(crdt_event::Column::Peer)
      .group_by(crdt_event::Column::Peer)
      .into_tuple()
      .all(&self.db)
      .await?;

    let mut frontiers = HashMap::new();
    for ce in ce_vec {
      frontiers.insert(ce.1, ce.0);
    }

    Ok(frontiers)
  }

  pub async fn crdt_get_event_from_frontiers(
    &self,
    frontiers: HashMap<String, i32>,
  ) -> anyhow::Result<Vec<CrdtEventVO>> {
    let query = crdt_event::Entity::find();

    let mut filter = crdt_event::Column::Peer.is_null();
    for (key, value) in frontiers.iter() {
      filter = filter.or(
        crdt_event::Column::Clock
          .gt(*value)
          .and(crdt_event::Column::Peer.eq(key)),
      );
    }
    filter = filter.or(crdt_event::Column::Peer.is_not_in(frontiers.keys()));

    let events = query.filter(filter).all(&self.db).await?;

    Ok(events.into_iter().map(|i| i.into()).collect())
  }

  pub async fn crdt_pull(&self, addr: &str) -> anyhow::Result<()> {
    let u = self.user.lock().await.clone();
    // 从远端拉取 CRDT 信息进行同步
    let remote_events = self
      .fetch_crdt_event(addr)
      .await
      .context("fetch crdt event failed")?;
    let events = remote_events
      .into_iter()
      .map(|e| e.decrypt(&u).expect("decrypt crdt crypted event failed"))
      .collect::<Vec<CrdtEventVO>>();

    if events.len() > 0 {
      self
        .crdt_insert_events(events)
        .await
        .context("crdt insert events failed")?;
      self
        .crdt_apply_all()
        .await
        .context("crdt apply all failed")?;
    }

    Ok(())
  }

  pub async fn crdt_push(&self, addr: &str) -> anyhow::Result<()> {
    // 将本地 CRDT 信息推送到远端
    let u = self.user.lock().await.clone();
    let frontiers = self.fetch_crdt_frontiers(addr).await?;
    let local_events = self.crdt_get_event_from_frontiers(frontiers).await?;
    let events = local_events
      .into_iter()
      .map(|e| {
        CrdtCryptedEventVO::from_data(&u, &e)
          .expect("create crdt crypted event failed")
      })
      .collect::<Vec<CrdtCryptedEventVO>>();
    if events.len() > 0 {
      self.push_crdt_event(addr, events).await?;
    }

    Ok(())
  }

  pub(crate) async fn crdt_sync_user_public(&self) -> anyhow::Result<()> {
    let mem_up = self.user.lock().await.public.clone();
    let json_up = match &self.user_file_path {
      Some(path) => {
        let u: SignerUser = serde_json::from_str(&fs::read_to_string(path)?)?;
        Some(u.public)
      }
      None => None,
    };
    let db_up: Option<SignerUserPublic> = match user::Entity::find()
      .filter(user::Column::PubKey.eq(&mem_up.pub_key))
      .one(&self.db)
      .await?
    {
      None => None,
      Some(m) => Some(serde_json::from_str(&m.user_public)?),
    };

    // 从内存、user.json、数据库中获取最新的 UserPublic 信息
    let mut up: SignerUserPublic = {
      let mut up = mem_up.clone();
      if let Some(json_up) = &json_up {
        if json_up.update_time > up.update_time {
          up = json_up.clone();
        }
      }
      if let Some(db_up) = &db_up {
        if db_up.update_time > up.update_time {
          up = db_up.clone();
        }
      }
      up
    };

    // 检查 AllowEndpoints 是否需要更新
    {
      let servers = server::Entity::find().all(&self.db).await?;
      let mut allow_endpoints =
        servers.into_iter().map(|s| s.addr).collect::<Vec<String>>();
      allow_endpoints.sort();

      if allow_endpoints != up.allow_endpoints {
        up.allow_endpoints = allow_endpoints;
        up.update_time = chrono::Utc::now().timestamp_millis();
      }
    }

    // 将更新后的数据按需合并至内存、user.json、数据库
    {
      if up.update_time > mem_up.update_time {
        tracing::debug!("已更新内存中的用户信息");
        self.user.lock().await.public = up.clone();
      }
      if let Some(json_up) = json_up {
        if up.update_time > json_up.update_time {
          tracing::debug!("已更新配置文件中的用户信息");
          let p = self.user_file_path.as_ref().unwrap();
          let mut u: SignerUser =
            serde_json::from_str(&fs::read_to_string(p)?)?;
          u.public = up.clone();
          fs::write(p, serde_json::to_string(&u)?)?;
        }
      }

      let db_need_update = match db_up {
        None => true,
        Some(db_up) => up.update_time > db_up.update_time,
      };
      if db_need_update {
        tracing::debug!("已更新数据库中的用户信息");
        self
          .put_user(UserVO {
            pub_key: up.pub_key.clone(),
            user_public: serde_json::to_string(&up)?,
            signed_user_public: serde_json::to_string(
              &SignerSigned::from_value(&self.user.lock().await.clone(), &up)?,
            )?,
          })
          .await?;
      }
    }

    /* TODO: 通过 EventBus 向应用发出通知已更新 UserPublic 信息 */
    Ok(())
  }

  async fn fetch_crdt_event(
    &self,
    addr: &str,
  ) -> anyhow::Result<Vec<CrdtCryptedEventVO>> {
    let u = self.user.lock().await.clone();

    let frontiers = self.crdt_get_frontiers().await?;
    let frontiers = serde_json::to_string(&frontiers)?;

    let c = reqwest::Client::new();
    let response = c
      .get(&format!("{}/crdt-events", addr))
      .query(&[("frontiers", frontiers)])
      .with_signer_auth(&u)
      .send()
      .await?
      .error_for_status()?;

    let json = response.json::<serde_json::Value>().await?;
    let events: Vec<CrdtCryptedEventVO> = json
      .get("data")
      .unwrap()
      .as_array()
      .unwrap()
      .iter()
      .map(|e| serde_json::from_value(e.clone()).unwrap())
      .collect();

    Ok(events)
  }

  async fn fetch_crdt_frontiers(
    &self,
    addr: &str,
  ) -> anyhow::Result<HashMap<String, i32>> {
    let u = self.user.lock().await.clone();

    let c = reqwest::Client::new();
    let response = c
      .get(&format!("{}/crdt-frontiers", addr))
      .with_signer_auth(&u)
      .send()
      .await?
      .error_for_status()?;

    let frontiers: HashMap<String, i32> = response.json().await?;

    Ok(frontiers)
  }

  async fn push_crdt_event(
    &self,
    addr: &str,
    events: Vec<CrdtCryptedEventVO>,
  ) -> anyhow::Result<()> {
    let u = self.user.lock().await.clone();

    reqwest::Client::new()
      .post(&format!("{}/crdt-events", addr))
      .with_signer_auth(&u)
      .json(&serde_json::json!({
        "data": events,
      }))
      .send()
      .await?
      .error_for_status()?;

    Ok(())
  }
}