use std::{collections::HashSet, sync::Arc};
use anyhow::Context;
use eventsource_client::{Client, SSE};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use signer_core::{
SignerJWT, SignerJWTClaims, SignerJWTHeader, SignerSigned, SignerUser,
SignerUserPublic,
};
use signer_hub_kit::models::PostCrdtEventsRequest;
use tokio::sync::RwLock;
use crate::{
OpenapiConfiguration, SignerDaemon,
model::viewobject::{
CrdtCryptedEventVO, CrdtEventVO, Envelope, MessageVO, UserVO,
},
};
pub struct SignerRemote {
addr: String,
event_source_terminator: Option<tokio::sync::mpsc::Sender<()>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventData {
NewCrdtEvents,
NewEnvelopes,
}
impl SignerRemote {
pub fn new(addr: &str) -> Self {
Self {
addr: addr.to_string(),
event_source_terminator: None,
}
}
pub async fn ping(&self, user: &SignerUser) -> anyhow::Result<()> {
signer_hub_kit::apis::default_api::api_signer_get(
&OpenapiConfiguration::new(user.clone(), self.addr.clone()).build(),
)
.await?;
Ok(())
}
pub(crate) async fn pull_crdt_event(
&self,
daemon: &mut SignerDaemon,
) -> anyhow::Result<()> {
let u = daemon.state.user.clone();
let frontiers = daemon.controller.crdt_event.get_event_frontiers().await?;
let frontiers = serde_json::to_string(&frontiers)?;
let r = signer_hub_kit::apis::default_api::api_signer_crdt_events_get(
&OpenapiConfiguration::new(u.clone(), self.addr.clone()).build(),
Some(&frontiers),
)
.await?;
let events = r
.data
.into_iter()
.map(|i| i.into())
.collect::<Vec<CrdtCryptedEventVO>>();
let events = events
.into_iter()
.map(|e| e.decrypt(&u).expect("decrypt crdt crypted event failed"))
.collect::<Vec<CrdtEventVO>>();
if events.len() > 0 {
daemon
.controller
.crdt_event
.insert_events(events)
.await
.context("crdt insert events failed")?;
}
Ok(())
}
pub async fn push_crdt_event(
&self,
daemon: &mut SignerDaemon,
) -> anyhow::Result<()> {
let u = daemon.state.user.clone();
let frontiers = daemon.controller.crdt_event.get_event_frontiers().await?;
let local_events = daemon
.controller
.crdt_event
.get_event_from_frontiers(frontiers)
.await?;
let events = local_events
.into_iter()
.map(|e| {
CrdtCryptedEventVO::from_data(&u, &e)
.expect("create crdt crypted event failed")
})
.collect::<Vec<CrdtCryptedEventVO>>();
if events.len() > 0 {
let req = PostCrdtEventsRequest {
data: events.into_iter().map(|i| i.into()).collect(),
};
signer_hub_kit::apis::default_api::api_signer_crdt_events_post(
&OpenapiConfiguration::new(u, self.addr.clone()).build(),
req,
)
.await?;
}
Ok(())
}
pub async fn sync_crdt_event(
&self,
daemon: &mut SignerDaemon,
) -> anyhow::Result<()> {
self
.pull_crdt_event(daemon)
.await
.context("crdt pull failed")?;
self
.push_crdt_event(daemon)
.await
.context("crdt push failed")?;
daemon.controller.crdt_adapter.apply_all().await?;
daemon.sync_user_public().await?;
Ok(())
}
pub async fn sync_envelopes(
&self,
daemon: &mut SignerDaemon,
) -> anyhow::Result<()> {
self
.pull_message(daemon)
.await
.context("remote pull message failed")
}
pub async fn pull_message(
&self,
daemon: &mut SignerDaemon,
) -> anyhow::Result<()> {
let user = daemon.state.user.clone();
let r = signer_hub_kit::apis::default_api::api_signer_envelopes_get(
&OpenapiConfiguration::new(user.clone(), self.addr.clone()).build(),
)
.await?;
let mut success_set: HashSet<i32> = HashSet::new();
for e in r.data {
let id = e.id;
let envelope: Envelope = e.into();
let message = match envelope.open(&user) {
Ok(val) => val.message,
Err(_) => continue,
};
daemon.store.message.put(message).await?;
success_set.insert(id);
}
let ids = success_set.into_iter().collect::<Vec<i32>>();
signer_hub_kit::apis::default_api::api_signer_envelopes_check_patch(
&OpenapiConfiguration::new(user.clone(), self.addr.clone()).build(),
ids,
)
.await?;
Ok(())
}
pub async fn push_message(
&self,
daemon: &mut SignerDaemon,
data: Vec<MessageVO>,
) -> anyhow::Result<()> {
let mut envelopes = Vec::new();
for i in data {
let envelope =
Envelope::create(&daemon, &i, vec![self.addr.clone()]).await?;
envelopes.push(envelope);
}
let user = &daemon.state.user;
for e in envelopes {
e.send(user).await?;
}
Ok(())
}
pub async fn pull_user(
&self,
daemon: &mut SignerDaemon,
user_key: &str,
) -> anyhow::Result<()> {
let user = daemon.state.user.clone();
let r = signer_hub_kit::apis::default_api::api_signer_users_user_key_get(
&OpenapiConfiguration::new(user.clone(), self.addr.clone()).build(),
user_key,
)
.await?;
let signed_up = SignerSigned::<SignerUserPublic> {
sig: r.sig,
msg: r.msg,
pubkey: r.pubkey,
_marker: Default::default(),
};
let up = signed_up.verify_to_value()?;
let user_vo = UserVO {
pub_key: up.pub_key.clone(),
user_public: serde_json::to_string(&up)?,
signed_user_public: serde_json::to_string(&signed_up)?,
};
daemon.store.user.put(&mut daemon.state, user_vo).await?;
Ok(())
}
pub async fn open_eventsource(
&mut self,
daemon: Arc<RwLock<SignerDaemon>>,
) -> anyhow::Result<()> {
if self.event_source_terminator.is_some() {
return Err(anyhow::anyhow!("event source already exists"));
}
let (tx, mut es) = SignerRemoteEventSource::new(&self.addr, daemon.clone());
self.event_source_terminator = Some(tx);
tokio::spawn(async move {
es.open_eventsource()
.await
.expect("open eventsource failed");
});
Ok(())
}
pub async fn close(&mut self) {
if let Some(event_source) = &mut self.event_source_terminator {
let _ = event_source.send(()).await;
self.event_source_terminator = None;
}
}
}
pub struct SignerRemoteEventSource {
addr: String,
daemon: Arc<RwLock<SignerDaemon>>,
rx: tokio::sync::mpsc::Receiver<()>,
}
impl SignerRemoteEventSource {
fn new(
addr: &str,
daemon: Arc<RwLock<SignerDaemon>>,
) -> (tokio::sync::mpsc::Sender<()>, Self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
(
tx,
Self {
addr: addr.to_string(),
daemon,
rx,
},
)
}
async fn open_eventsource(&mut self) -> anyhow::Result<()> {
let u = self.daemon.read().await.state.user.clone();
let remote = SignerRemote::new(&self.addr);
'outer: loop {
let jwt = SignerJWT::new(
SignerJWTHeader::default(&u),
SignerJWTClaims::default(
&u,
self.addr.clone(),
uuid::Uuid::new_v4().to_string(),
)
.with_expired_duration(chrono::Duration::minutes(5)),
);
let client = eventsource_client::ClientBuilder::for_url(&format!(
"{}/api/signer/events",
&self.addr
))
.context("create eventsource client failed")?
.header(
"Authorization",
&format!("Bearer {}", &jwt.encode(&u).unwrap()),
)
.context("add header failed")?
.build_http();
let mut stream = Box::pin(client.stream());
{
let daemon = &mut *self.daemon.write().await;
remote.sync_crdt_event(daemon).await?;
remote.sync_envelopes(daemon).await?;
}
'inner: loop {
let e = tokio::select! {
event = stream.next() => event,
_ = self.rx.recv() => {
break 'outer;
},
};
let e = match e {
None => continue,
Some(e) => match e {
Err(err) => {
tracing::error!(
"receive message error from {}: {}. reconnecting.",
&self.addr,
err
);
break 'inner;
}
Ok(e) => e,
},
};
let e = match e {
SSE::Event(e) => e,
_ => continue,
};
let ed: EventData = match serde_json::from_str(&e.data) {
Err(_) => continue,
Ok(ed) => ed,
};
match ed {
EventData::NewCrdtEvents => {
remote
.sync_crdt_event(&mut *self.daemon.write().await)
.await
.expect("sync crdt event failed");
}
EventData::NewEnvelopes => {
remote
.sync_envelopes(&mut *self.daemon.write().await)
.await
.expect("sync envelopes failed");
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use signer_core::SignerUser;
use crate::{
SignerDaemon, model::viewobject::ServerVO, signer_remote::SignerRemote,
};
#[tokio::test]
async fn test_signer_connection() -> anyhow::Result<()> {
let alice = SignerUser::generete("alice")?;
let alice_daemon = SignerDaemon::from_memory(&alice, "test").await?;
{
let daemon = &mut *alice_daemon.write().await;
daemon
.store
.server
.put(
&mut daemon.state,
ServerVO {
addr: "http://127.0.0.1:8080".to_string(),
enable: true,
limitation_set: "{}".to_string(),
},
)
.await?;
}
alice_daemon.write().await.finalize().await?;
Ok(())
}
#[tokio::test]
async fn test_pull_user() -> anyhow::Result<()> {
let u = SignerUser::generete("alice")?;
let peer = uuid::Uuid::new_v4().to_string();
let daemon = SignerDaemon::from_memory(&u, &peer).await?;
let remote = SignerRemote::new("http://localhost:8080");
remote.ping(&u).await?;
remote
.pull_user(&mut *daemon.write().await, &u.public.pub_key)
.await?;
let target = daemon
.read()
.await
.store
.user
.get(&u.public.pub_key)
.await?;
assert!(target.is_some());
Ok(())
}
}