use std::{collections::HashSet, sync::Arc};
use eventsource_client::{Client, SSE};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use signer_core::{
SignerJWT, SignerJWTClaims, SignerJWTHeader, SignerSigned, SignerUser, SignerUserPublic,
};
use tokio::sync::RwLock;
use crate::{
GetCrdtEventsResponse, GetEnvelopesResponse, HttpClient, HttpClientConfig,
OApiSignerSignedOApiSignerUserPublic, PostCrdtEventsRequest, SignerDaemon,
model::viewobject::{CrdtCryptedEventVO, CrdtEventVO, Envelope, MessageVO, UserVO},
};
pub struct SignerRemote {
addr: String,
event_source_terminator: Option<tokio::sync::mpsc::Sender<()>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventData {
NewCrdtEvents,
NewEnvelopes,
}
impl SignerRemote {
pub fn new(addr: &str) -> Self {
Self {
addr: addr.to_string(),
event_source_terminator: None,
}
}
pub async fn ping(&self, user: &SignerUser) -> crate::DaemonResult<()> {
let config = HttpClientConfig::new(user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let _response: serde_json::Value = client.get("/api").await?;
Ok(())
}
pub(crate) async fn pull_crdt_event(
&self,
daemon: &mut SignerDaemon,
) -> crate::DaemonResult<()> {
let u = daemon.state.user.clone();
let frontiers = daemon.controller.crdt_event.get_event_frontiers().await?;
let frontiers = serde_json::to_string(&frontiers)?;
let config = HttpClientConfig::new(u.clone(), self.addr.clone());
let client = HttpClient::new(config);
#[derive(serde::Serialize)]
struct QueryParams {
frontiers: Option<String>,
}
let query = QueryParams {
frontiers: Some(frontiers),
};
let r: GetCrdtEventsResponse = client.get_with_query("/api/crdt-events", &query).await?;
let events = r
.data
.into_iter()
.map(|i| i.into())
.collect::<Vec<CrdtCryptedEventVO>>();
let events = events
.into_iter()
.map(|e| e.decrypt(&u).expect("解密 CRDT 加密事件失败"))
.collect::<Vec<CrdtEventVO>>();
if events.len() > 0 {
daemon
.controller
.crdt_event
.insert_events(events)
.await
.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"CRDT 插入事件失败: {}",
e
)))
})?;
}
Ok(())
}
pub async fn push_crdt_event(&self, daemon: &mut SignerDaemon) -> crate::DaemonResult<()> {
let u = daemon.state.user.clone();
let frontiers = daemon.controller.crdt_event.get_event_frontiers().await?;
let local_events = daemon
.controller
.crdt_event
.get_event_from_frontiers(frontiers)
.await?;
let events = local_events
.into_iter()
.map(|e| CrdtCryptedEventVO::from_data(&u, &e).expect("创建 CRDT 加密事件失败"))
.collect::<Vec<CrdtCryptedEventVO>>();
if events.len() > 0 {
let req = PostCrdtEventsRequest {
data: events.into_iter().map(|i| i.into()).collect(),
};
let config = HttpClientConfig::new(u, self.addr.clone());
let client = HttpClient::new(config);
let _response: serde_json::Value = client.post("/api/crdt-events", &req).await?;
}
Ok(())
}
pub async fn sync_crdt_event(&self, daemon: &mut SignerDaemon) -> crate::DaemonResult<()> {
self.pull_crdt_event(daemon).await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!("CRDT 拉取失败: {}", e)))
})?;
self.push_crdt_event(daemon).await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!("CRDT 推送失败: {}", e)))
})?;
daemon.controller.crdt_adapter.apply_all().await?;
daemon.sync_user_public().await?;
Ok(())
}
pub async fn sync_envelopes(&self, daemon: &mut SignerDaemon) -> crate::DaemonResult<()> {
self.pull_message(daemon).await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!("远程拉取消息失败: {}", e)))
})
}
pub async fn pull_message(&self, daemon: &mut SignerDaemon) -> crate::DaemonResult<()> {
let user = daemon.state.user.clone();
let config = HttpClientConfig::new(user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let r: GetEnvelopesResponse = client.get("/api/envelopes").await?;
let mut success_set: HashSet<i32> = HashSet::new();
for e in r.data {
let id = e.id;
let envelope: Envelope = e.into();
let message = match envelope.open(&user) {
Ok(val) => val.message,
Err(_) => continue,
};
daemon.store.message.put(message).await?;
success_set.insert(id);
}
let ids = success_set.into_iter().collect::<Vec<i32>>();
let config = HttpClientConfig::new(user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let _response: serde_json::Value = client.patch("/api/envelopes/check", &ids).await?;
Ok(())
}
pub async fn push_message(
&self,
daemon: &mut SignerDaemon,
data: Vec<MessageVO>,
) -> crate::DaemonResult<()> {
let mut envelopes = Vec::new();
for i in data {
let envelope = Envelope::create(&daemon, &i, vec![self.addr.clone()]).await?;
envelopes.push(envelope);
}
let user = &daemon.state.user;
for e in envelopes {
e.send(user).await?;
}
Ok(())
}
pub async fn pull_user(
&self,
daemon: &mut SignerDaemon,
user_key: &str,
) -> crate::DaemonResult<()> {
let user = daemon.state.user.clone();
let config = HttpClientConfig::new(user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let r: OApiSignerSignedOApiSignerUserPublic =
client.get(&format!("/api/users/{}", user_key)).await?;
let signed_up = SignerSigned::<SignerUserPublic> {
sig: r.sig,
msg: r.msg,
pubkey: r.pubkey,
_marker: Default::default(),
};
let up = signed_up.verify_to_value()?;
let user_vo = UserVO {
pub_key: up.pub_key.clone(),
user_public: serde_json::to_string(&up)?,
signed_user_public: serde_json::to_string(&signed_up)?,
};
daemon.store.user.put(&mut daemon.state, user_vo).await?;
Ok(())
}
pub async fn open_eventsource(
&mut self,
daemon: Arc<RwLock<SignerDaemon>>,
) -> crate::DaemonResult<()> {
if self.event_source_terminator.is_some() {
return Err(crate::DaemonError::Signer(crate::SignerError::Msg(
"事件源已存在".to_string(),
)));
}
let (tx, mut es) = SignerRemoteEventSource::new(&self.addr, daemon.clone());
self.event_source_terminator = Some(tx);
tokio::spawn(async move {
es.open_eventsource().await.expect("打开事件源失败");
});
Ok(())
}
pub async fn close(&mut self) {
if let Some(event_source) = &mut self.event_source_terminator {
let _ = event_source.send(()).await;
self.event_source_terminator = None;
}
}
}
pub struct SignerRemoteEventSource {
addr: String,
daemon: Arc<RwLock<SignerDaemon>>,
rx: tokio::sync::mpsc::Receiver<()>,
}
impl SignerRemoteEventSource {
fn new(addr: &str, daemon: Arc<RwLock<SignerDaemon>>) -> (tokio::sync::mpsc::Sender<()>, Self) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
(
tx,
Self {
addr: addr.to_string(),
daemon,
rx,
},
)
}
async fn open_eventsource(&mut self) -> crate::DaemonResult<()> {
let u = self.daemon.read().await.state.user.clone();
let remote = SignerRemote::new(&self.addr);
'outer: loop {
let jwt = SignerJWT::new(
SignerJWTHeader::default(&u),
SignerJWTClaims::default(&u, self.addr.clone(), uuid::Uuid::new_v4().to_string())
.with_expired_duration(chrono::Duration::minutes(5)),
);
let client =
eventsource_client::ClientBuilder::for_url(&format!("{}/api/events", &self.addr))
.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"创建事件源客户端失败: {}",
e
)))
})?
.header(
"Authorization",
&format!("Bearer {}", &jwt.encode(&u).unwrap()),
)
.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"添加请求头失败: {}",
e
)))
})?
.build_http();
let mut stream = Box::pin(client.stream());
{
let daemon = &mut *self.daemon.write().await;
remote.sync_crdt_event(daemon).await?;
remote.sync_envelopes(daemon).await?;
}
'inner: loop {
let e = tokio::select! {
event = stream.next() => event,
_ = self.rx.recv() => {
break 'outer;
},
};
let e = match e {
None => continue,
Some(e) => match e {
Err(err) => {
tracing::error!("从 {} 接收消息错误: {}. 正在重连.", &self.addr, err);
break 'inner;
}
Ok(e) => e,
},
};
let e = match e {
SSE::Event(e) => e,
_ => continue,
};
let ed: EventData = match serde_json::from_str(&e.data) {
Err(_) => continue,
Ok(ed) => ed,
};
match ed {
EventData::NewCrdtEvents => {
remote
.sync_crdt_event(&mut *self.daemon.write().await)
.await
.expect("同步 CRDT 事件失败");
}
EventData::NewEnvelopes => {
remote
.sync_envelopes(&mut *self.daemon.write().await)
.await
.expect("同步信封失败");
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use signer_core::SignerUser;
use crate::{SignerDaemon, model::viewobject::ServerVO, signer_remote::SignerRemote};
#[tokio::test]
async fn test_signer_connection() -> crate::DaemonResult<()> {
let alice = SignerUser::generete("alice")?;
let alice_daemon = SignerDaemon::from_memory(&alice, "test").await?;
{
let daemon = &mut *alice_daemon.write().await;
daemon
.store
.server
.put(
&mut daemon.state,
ServerVO {
addr: "http://127.0.0.1:8080".to_string(),
enable: true,
limitation_set: "{}".to_string(),
},
)
.await?;
}
alice_daemon.write().await.finalize().await?;
Ok(())
}
#[tokio::test]
async fn test_pull_user() -> crate::DaemonResult<()> {
let u = SignerUser::generete("alice")?;
let peer = uuid::Uuid::new_v4().to_string();
let daemon = SignerDaemon::from_memory(&u, &peer).await?;
let remote = SignerRemote::new("http://localhost:8080");
remote.ping(&u).await?;
remote
.pull_user(&mut *daemon.write().await, &u.public.pub_key)
.await?;
let target = daemon
.read()
.await
.store
.user
.get(&u.public.pub_key)
.await?;
assert!(target.is_some());
Ok(())
}
}