signer-daemon 0.2.3

Signer daemon package.
Documentation
use std::sync::Arc;

use anyhow::Context;
use eventsource_client::{Client, SSE};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use signer_core::{SignerJWT, SignerJWTClaims, SignerJWTHeader};

use crate::{SignerDaemon, entity::server};

#[derive(Debug)]
pub struct SignerConnection {
  server: server::Model,
  daemon: SignerDaemon,
  closer: tokio::sync::mpsc::Sender<()>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventData {
  NewCrdtEvents,
  NewEnvelopes,
}

impl SignerConnection {
  pub async fn new(daemon: &SignerDaemon, s: &server::Model) -> Arc<Self> {
    tracing::debug!("create new signer connection to {}", &s.addr);

    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);

    daemon
      .ping_server(&s.addr)
      .await
      .expect(&format!("ping server {} failed", &s.addr));

    let u = daemon.user.lock().await.clone();

    let connection = Arc::new(Self {
      server: s.clone(),
      closer: tx,
      daemon: daemon.clone(),
    });

    tokio::spawn({
      let s = s.clone();
      let connection = connection.clone();

      async move {
        'outer: loop {
          let jwt = SignerJWT::new(
            SignerJWTHeader::default(&u),
            SignerJWTClaims::default(
              &u,
              format!("*"),
              uuid::Uuid::new_v4().to_string(),
            )
            .with_expired_duration(chrono::Duration::minutes(5)),
          );
          let client = eventsource_client::ClientBuilder::for_url(&format!(
            "{}/events",
            &s.addr
          ))
          .expect("create eventsource client failed")
          .header(
            "Authorization",
            &format!("Bearer {}", &jwt.encode(&u).unwrap()),
          )
          .expect("add header failed")
          .build_http();

          let mut stream = Box::pin(client.stream());

          'inner: loop {
            let e = tokio::select! {
              event = stream.next() => event,
              _ = rx.recv() => {
                break 'outer;
              },
            };

            let e = match e {
              None => {
                tracing::warn!("receive empty message from {}", &s.addr);
                continue;
              }
              Some(e) => match e {
                Err(err) => {
                  tracing::error!(
                    "receive message error from {}: {}. reconnecting.",
                    &s.addr,
                    err
                  );
                  break 'inner;
                }
                Ok(e) => e,
              },
            };

            let e = match e {
              SSE::Event(e) => e,
              _ => continue,
            };

            let ed: EventData = match serde_json::from_str(&e.data) {
              Err(_) => continue,
              Ok(ed) => ed,
            };

            match ed {
              EventData::NewCrdtEvents => {
                connection
                  .sync_crdt_event()
                  .await
                  .expect("sync crdt event failed");
              }
              EventData::NewEnvelopes => {
                connection
                  .sync_envelopes()
                  .await
                  .expect("sync envelopes failed");
              }
            }
          }
        }
      }
    });

    tokio::spawn({
      let connection = connection.clone();
      async move {
        let _ = connection.sync_crdt_event().await;
        let _ = connection.sync_envelopes().await;
      }
    });

    connection
  }

  // 使用 frontiers 同步 CRDT 信息
  pub async fn sync_crdt_event(&self) -> anyhow::Result<()> {
    // 从远端拉取 CRDT 信息进行同步
    self
      .daemon
      .crdt_pull(&self.server.addr)
      .await
      .context("crdt pull failed")?;
    // 将本地 CRDT 信息推送到远端
    self
      .daemon
      .crdt_push(&self.server.addr)
      .await
      .context("crdt push failed")?;

    Ok(())
  }

  // 使用 Envelopes 同步消息
  pub async fn sync_envelopes(&self) -> anyhow::Result<()> {
    self
      .daemon
      .remote_pull_message(&self.server.addr)
      .await
      .context("remote pull message failed")
  }

  pub async fn close(&self) {
    let _ = self.closer.send(()).await;
  }
}

#[cfg(test)]
mod test {
  use std::time::Duration;

  use signer_core::SignerUser;

  use crate::{SignerDaemon, model::viewobject::ServerVO};

  #[tokio::test]
  async fn test_signer_connection() -> anyhow::Result<()> {
    let alice = SignerUser::generete("alice")?;
    let alice_daemon = SignerDaemon::from_memory(&alice, "test").await?;

    alice_daemon
      .put_server(ServerVO {
        addr: "http://127.0.0.1:8080/api/signer".to_string(),
        enable: true,
        limitation_set: "{}".to_string(),
      })
      .await?;

    tokio::time::sleep(Duration::from_secs(1)).await;

    alice_daemon.finalize().await?;

    Ok(())
  }
}