signer-daemon 0.2.1

Signer daemon package.
Documentation
use std::sync::Arc;

use eventsource_client::{Client, SSE};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use signer_core::{SignerJWT, SignerJWTClaims, SignerJWTHeader};

use crate::{SignerDaemon, entity::server};

#[derive(Debug)]
pub struct SignerConnection {
  server: server::Model,
  daemon: SignerDaemon,
  closer: tokio::sync::mpsc::Sender<()>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventData {
  NewCrdtEvents,
  NewEnvelopes,
}

impl SignerConnection {
  pub async fn new(daemon: &SignerDaemon, s: &server::Model) -> Arc<Self> {
    tracing::debug!("create new signer connection to {}", &s.addr);

    let model = s.clone();
    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);

    let u = daemon.user.lock().await.clone();
    let s = s.clone();

    let connection = Arc::new(Self {
      server: model,
      closer: tx,
      daemon: daemon.clone(),
    });

    tokio::spawn({
      let connection = connection.clone();

      async move {
        'outer: loop {
          let jwt = SignerJWT::new(
            SignerJWTHeader::default(&u),
            SignerJWTClaims::default(
              &u,
              format!("*"),
              uuid::Uuid::new_v4().to_string(),
            )
            .with_expired_duration(chrono::Duration::minutes(5)),
          );
          let client = eventsource_client::ClientBuilder::for_url(&format!(
            "{}/events",
            &s.addr
          ))
          .expect("create eventsource client failed")
          .header(
            "Cookie",
            &format!("Authorization={}", &jwt.encode(&u).unwrap()),
          )
          .expect("add header failed")
          .build_http();

          let mut stream = Box::pin(client.stream());

          'inner: loop {
            let e = tokio::select! {
              event = stream.next() => event,
              _ = rx.recv() => {
                break 'outer;
              },
            };

            let e = match e {
              None => {
                tracing::warn!("receive empty message from {}", &s.addr);
                continue;
              }
              Some(e) => match e {
                Err(err) => {
                  tracing::error!(
                    "receive message error from {}: {}. reconnecting.",
                    &s.addr,
                    err
                  );
                  break 'inner;
                }
                Ok(e) => e,
              },
            };

            let e = match e {
              SSE::Event(e) => e,
              _ => continue,
            };

            let ed: EventData = match serde_json::from_str(&e.data) {
              Err(_) => continue,
              Ok(ed) => ed,
            };

            match ed {
              EventData::NewCrdtEvents => {
                connection
                  .sync_crdt_event()
                  .await
                  .expect("sync crdt event failed");
              }
              EventData::NewEnvelopes => {
                connection
                  .sync_envelopes()
                  .await
                  .expect("sync envelopes failed");
              }
            }
          }
        }
      }
    });

    tokio::spawn({
      let connection = connection.clone();
      async move {
        let _ = connection.sync_crdt_event().await;
        let _ = connection.sync_envelopes().await;
      }
    });

    connection
  }

  // 使用 frontiers 同步 CRDT 信息
  pub async fn sync_crdt_event(&self) -> anyhow::Result<()> {
    // 从远端拉取 CRDT 信息进行同步
    self.daemon.crdt_pull(&self.server.addr).await?;
    // 将本地 CRDT 信息推送到远端
    self.daemon.crdt_push(&self.server.addr).await?;

    Ok(())
  }

  // 使用 Envelopes 同步消息
  pub async fn sync_envelopes(&self) -> anyhow::Result<()> {
    self.daemon.remote_pull_message(&self.server.addr).await?;
    Ok(())
  }

  pub fn close(&self) {
    if self.closer.is_closed() {
      return;
    }

    let (tx, rx) = std::sync::mpsc::channel();
    tokio::spawn({
      let closer = self.closer.clone();
      async move {
        let _ = closer.send(()).await;
        tx.send(())
          .expect("send signer connection terminate signal failed");
      }
    });

    let _ = rx.recv();
  }
}

impl Drop for SignerConnection {
  fn drop(&mut self) {
    self.close();
  }
}

#[cfg(test)]
mod test {
  use std::time::Duration;

  use sea_orm::{ActiveModelTrait as _, ActiveValue::Set};
  use signer_core::{SignerFsStore, SignerUser};

  use crate::{SignerDaemon, entity::server};

  #[tokio::test]
  async fn test_signer_connection() -> anyhow::Result<()> {
    let sfs = SignerFsStore::new("./testdir".to_string());
    let alice = SignerUser::generete("alice")?;

    sfs.save_signer_user(&alice)?;
    let alice_daemon = SignerDaemon::from_sfs(&alice, &sfs).await?;

    server::ActiveModel {
      addr: Set("http://127.0.0.1:8080/api/signer".to_string()),
      enable: Set(true),
      ..Default::default()
    }
    .insert(&alice_daemon.db)
    .await?;

    alice_daemon.update_connections().await?;

    tokio::time::sleep(Duration::from_secs(1)).await;

    alice_daemon.finalize().await?;

    Ok(())
  }
}