use std::{collections::HashMap, fs};
use anyhow::Context as _;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use signer_core::{SignerSigned, SignerUser, SignerUserPublic};
use crate::{
entity::{crdt_event, server, user},
model::viewobject::{CrdtCryptedEventVO, CrdtEventVO, UserVO},
reqwest_auth::ReqwestAuth,
signer_daemon::signer_daemon_event::SignerDaemonEvent,
};
use super::SignerDaemon;
impl SignerDaemon {
pub async fn crdt_insert_events(
&self,
events: Vec<CrdtEventVO>,
) -> anyhow::Result<()> {
let m = events
.into_iter()
.map(|i| crdt_event::ActiveModel::from(crdt_event::Model::from(i.into())))
.collect::<Vec<crdt_event::ActiveModel>>();
crdt_event::Entity::insert_many(m).exec(&self.db).await?;
Ok(())
}
pub async fn crdt_apply_all(&self) -> anyhow::Result<()> {
let anchor = crdt_event::Entity::find()
.filter(crdt_event::Column::Revert.is_null())
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.one(&self.db)
.await
.context("find anchor failed")?;
if let Some(anchor) = &anchor {
let revert_vec = crdt_event::Entity::find()
.filter(
crdt_event::Column::Revert.is_not_null().and(
crdt_event::Column::Clock.gt(anchor.clock).or(
crdt_event::Column::Clock
.eq(anchor.clock)
.and(crdt_event::Column::Peer.gt(&anchor.peer)),
),
),
)
.order_by_desc(crdt_event::Column::Clock)
.order_by_desc(crdt_event::Column::Peer)
.all(&self.db)
.await
.context("find revert events failed")?;
for revert in revert_vec {
revert
.revert(&self)
.await
.context(format!("apply event {:?} failed", revert))?;
}
}
let apply_vec = match anchor {
Some(anchor) => crdt_event::Entity::find()
.filter(
crdt_event::Column::Clock.gt(anchor.clock).or(
crdt_event::Column::Clock
.eq(anchor.clock)
.and(crdt_event::Column::Peer.gte(anchor.peer)),
),
)
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.all(&self.db)
.await
.context("find apply events failed")?,
None => crdt_event::Entity::find()
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.all(&self.db)
.await
.context("find apply events failed")?,
};
for apply in apply_vec {
apply
.apply(&self)
.await
.context(format!("apply event {:?} failed", apply))?;
}
self.emit(SignerDaemonEvent::SyncUser)?;
Ok(())
}
pub async fn crdt_get_frontiers(
&self,
) -> anyhow::Result<HashMap<String, i32>> {
let ce_vec: Vec<(i32, String)> = crdt_event::Entity::find()
.select_only()
.column_as(crdt_event::Column::Clock.max(), "clock")
.column(crdt_event::Column::Peer)
.group_by(crdt_event::Column::Peer)
.into_tuple()
.all(&self.db)
.await?;
let mut frontiers = HashMap::new();
for ce in ce_vec {
frontiers.insert(ce.1, ce.0);
}
Ok(frontiers)
}
pub async fn crdt_get_event_from_frontiers(
&self,
frontiers: HashMap<String, i32>,
) -> anyhow::Result<Vec<CrdtEventVO>> {
let query = crdt_event::Entity::find();
let mut filter = crdt_event::Column::Peer.is_null();
for (key, value) in frontiers.iter() {
filter = filter.or(
crdt_event::Column::Clock
.gt(*value)
.and(crdt_event::Column::Peer.eq(key)),
);
}
filter = filter.or(crdt_event::Column::Peer.is_not_in(frontiers.keys()));
let events = query.filter(filter).all(&self.db).await?;
Ok(events.into_iter().map(|i| i.into()).collect())
}
pub async fn crdt_pull(&self, addr: &str) -> anyhow::Result<()> {
let u = self.user.lock().await.clone();
let remote_events = self
.fetch_crdt_event(addr)
.await
.context("fetch crdt event failed")?;
let events = remote_events
.into_iter()
.map(|e| e.decrypt(&u).expect("decrypt crdt crypted event failed"))
.collect::<Vec<CrdtEventVO>>();
if events.len() > 0 {
self
.crdt_insert_events(events)
.await
.context("crdt insert events failed")?;
self
.crdt_apply_all()
.await
.context("crdt apply all failed")?;
}
Ok(())
}
pub async fn crdt_push(&self, addr: &str) -> anyhow::Result<()> {
let u = self.user.lock().await.clone();
let frontiers = self.fetch_crdt_frontiers(addr).await?;
let local_events = self.crdt_get_event_from_frontiers(frontiers).await?;
let events = local_events
.into_iter()
.map(|e| {
CrdtCryptedEventVO::from_data(&u, &e)
.expect("create crdt crypted event failed")
})
.collect::<Vec<CrdtCryptedEventVO>>();
if events.len() > 0 {
self.push_crdt_event(addr, events).await?;
}
Ok(())
}
pub(crate) async fn crdt_sync_user_public(&self) -> anyhow::Result<()> {
let mem_up = self.user.lock().await.public.clone();
let json_up = match &self.user_file_path {
Some(path) => {
let u: SignerUser = serde_json::from_str(&fs::read_to_string(path)?)?;
Some(u.public)
}
None => None,
};
let db_up: Option<SignerUserPublic> = match user::Entity::find()
.filter(user::Column::PubKey.eq(&mem_up.pub_key))
.one(&self.db)
.await?
{
None => None,
Some(m) => Some(serde_json::from_str(&m.user_public)?),
};
let mut up: SignerUserPublic = {
let mut up = mem_up.clone();
if let Some(json_up) = &json_up {
if json_up.update_time > up.update_time {
up = json_up.clone();
}
}
if let Some(db_up) = &db_up {
if db_up.update_time > up.update_time {
up = db_up.clone();
}
}
up
};
{
let servers = server::Entity::find().all(&self.db).await?;
let mut allow_endpoints =
servers.into_iter().map(|s| s.addr).collect::<Vec<String>>();
allow_endpoints.sort();
if allow_endpoints != up.allow_endpoints {
up.allow_endpoints = allow_endpoints;
up.update_time = chrono::Utc::now().timestamp_millis();
}
}
{
if up.update_time > mem_up.update_time {
tracing::debug!("已更新内存中的用户信息");
self.user.lock().await.public = up.clone();
}
if let Some(json_up) = json_up {
if up.update_time > json_up.update_time {
tracing::debug!("已更新配置文件中的用户信息");
let p = self.user_file_path.as_ref().unwrap();
let mut u: SignerUser =
serde_json::from_str(&fs::read_to_string(p)?)?;
u.public = up.clone();
fs::write(p, serde_json::to_string(&u)?)?;
}
}
let db_need_update = match db_up {
None => true,
Some(db_up) => up.update_time > db_up.update_time,
};
if db_need_update {
tracing::debug!("已更新数据库中的用户信息");
self
.put_user(UserVO {
pub_key: up.pub_key.clone(),
user_public: serde_json::to_string(&up)?,
signed_user_public: serde_json::to_string(
&SignerSigned::from_value(&self.user.lock().await.clone(), &up)?,
)?,
})
.await?;
}
}
Ok(())
}
async fn fetch_crdt_event(
&self,
addr: &str,
) -> anyhow::Result<Vec<CrdtCryptedEventVO>> {
let u = self.user.lock().await.clone();
let frontiers = self.crdt_get_frontiers().await?;
let frontiers = serde_json::to_string(&frontiers)?;
let c = reqwest::Client::new();
let response = c
.get(&format!("{}/crdt-events", addr))
.query(&[("frontiers", frontiers)])
.with_signer_auth(&u)
.send()
.await?
.error_for_status()?;
let json = response.json::<serde_json::Value>().await?;
let events: Vec<CrdtCryptedEventVO> = json
.get("data")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|e| serde_json::from_value(e.clone()).unwrap())
.collect();
Ok(events)
}
async fn fetch_crdt_frontiers(
&self,
addr: &str,
) -> anyhow::Result<HashMap<String, i32>> {
let u = self.user.lock().await.clone();
let c = reqwest::Client::new();
let response = c
.get(&format!("{}/crdt-frontiers", addr))
.with_signer_auth(&u)
.send()
.await?
.error_for_status()?;
let frontiers: HashMap<String, i32> = response.json().await?;
Ok(frontiers)
}
async fn push_crdt_event(
&self,
addr: &str,
events: Vec<CrdtCryptedEventVO>,
) -> anyhow::Result<()> {
let u = self.user.lock().await.clone();
reqwest::Client::new()
.post(&format!("{}/crdt-events", addr))
.with_signer_auth(&u)
.json(&serde_json::json!({
"data": events,
}))
.send()
.await?
.error_for_status()?;
Ok(())
}
}