use std::sync::Arc;
use anyhow::Context;
use eventsource_client::{Client, SSE};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use signer_core::{SignerJWT, SignerJWTClaims, SignerJWTHeader};
use crate::{SignerDaemon, entity::server};
#[derive(Debug)]
pub struct SignerConnection {
server: server::Model,
daemon: SignerDaemon,
closer: tokio::sync::mpsc::Sender<()>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventData {
NewCrdtEvents,
NewEnvelopes,
}
impl SignerConnection {
pub async fn new(daemon: &SignerDaemon, s: &server::Model) -> Arc<Self> {
tracing::debug!("create new signer connection to {}", &s.addr);
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
daemon
.ping_server(&s.addr)
.await
.expect(&format!("ping server {} failed", &s.addr));
let u = daemon.user.lock().await.clone();
let connection = Arc::new(Self {
server: s.clone(),
closer: tx,
daemon: daemon.clone(),
});
tokio::spawn({
let s = s.clone();
let connection = connection.clone();
async move {
'outer: loop {
let jwt = SignerJWT::new(
SignerJWTHeader::default(&u),
SignerJWTClaims::default(
&u,
format!("*"),
uuid::Uuid::new_v4().to_string(),
)
.with_expired_duration(chrono::Duration::minutes(5)),
);
let client = eventsource_client::ClientBuilder::for_url(&format!(
"{}/events",
&s.addr
))
.expect("create eventsource client failed")
.header(
"Authorization",
&format!("Bearer {}", &jwt.encode(&u).unwrap()),
)
.expect("add header failed")
.build_http();
let mut stream = Box::pin(client.stream());
'inner: loop {
let e = tokio::select! {
event = stream.next() => event,
_ = rx.recv() => {
break 'outer;
},
};
let e = match e {
None => {
tracing::warn!("receive empty message from {}", &s.addr);
continue;
}
Some(e) => match e {
Err(err) => {
tracing::error!(
"receive message error from {}: {}. reconnecting.",
&s.addr,
err
);
break 'inner;
}
Ok(e) => e,
},
};
let e = match e {
SSE::Event(e) => e,
_ => continue,
};
let ed: EventData = match serde_json::from_str(&e.data) {
Err(_) => continue,
Ok(ed) => ed,
};
match ed {
EventData::NewCrdtEvents => {
connection
.sync_crdt_event()
.await
.expect("sync crdt event failed");
}
EventData::NewEnvelopes => {
connection
.sync_envelopes()
.await
.expect("sync envelopes failed");
}
}
}
}
}
});
tokio::spawn({
let connection = connection.clone();
async move {
let _ = connection.sync_crdt_event().await;
let _ = connection.sync_envelopes().await;
}
});
connection
}
pub async fn sync_crdt_event(&self) -> anyhow::Result<()> {
self
.daemon
.crdt_pull(&self.server.addr)
.await
.context("crdt pull failed")?;
self
.daemon
.crdt_push(&self.server.addr)
.await
.context("crdt push failed")?;
Ok(())
}
pub async fn sync_envelopes(&self) -> anyhow::Result<()> {
self
.daemon
.remote_pull_message(&self.server.addr)
.await
.context("remote pull message failed")
}
pub async fn close(&self) {
let _ = self.closer.send(()).await;
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use signer_core::SignerUser;
use crate::{SignerDaemon, model::viewobject::ServerVO};
#[tokio::test]
async fn test_signer_connection() -> anyhow::Result<()> {
let alice = SignerUser::generete("alice")?;
let alice_daemon = SignerDaemon::from_memory(&alice, "test").await?;
alice_daemon
.put_server(ServerVO {
addr: "http://127.0.0.1:8080/api/signer".to_string(),
enable: true,
limitation_set: "{}".to_string(),
})
.await?;
tokio::time::sleep(Duration::from_secs(1)).await;
alice_daemon.finalize().await?;
Ok(())
}
}