signer-crdt 0.4.1

Signer CRDT (Conflict-free Replicated Data Type) package.
Documentation
use sea_orm::{
    ActiveModelTrait as _,
    ActiveValue::Set,
    ColumnTrait, // 添加缺失的 trait 导入
    DatabaseTransaction,
    EntityTrait,
    QueryFilter,
    QueryOrder,
    TransactionTrait,
};
use serde::{Deserialize, Serialize};

use crate::{
    ReconcileError, SignerMeta,
    crdt::errors::CrdtError,
    delta::{chat_do::ChatDO, message_do::MessageDO, user_do::UserDO},
    entity::crdt_event,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDelta<T: CrdtType> {
    Put(T),
    Del(T::PrimaryKey),
    Skip,
}

impl<T> CrdtDelta<T>
where
    T: CrdtType,
{
    async fn apply(
        &self,
        core: &SignerMeta,
        tx: &DatabaseTransaction,
    ) -> Result<CrdtDelta<T>, CrdtError> {
        match self {
            CrdtDelta::Put(value) => value.put(core, tx).await,
            CrdtDelta::Del(key) => T::del(core, &key, tx).await,
            CrdtDelta::Skip => Ok(CrdtDelta::Skip),
        }
    }
}

pub trait CrdtType
where
    Self: Sized,
{
    type PrimaryKey;

    fn put(
        &self,
        core: &SignerMeta,
        tx: &DatabaseTransaction,
    ) -> impl std::future::Future<Output = Result<CrdtDelta<Self>, CrdtError>> + Send;
    fn del(
        core: &SignerMeta,
        key: &Self::PrimaryKey,
        tx: &DatabaseTransaction,
    ) -> impl std::future::Future<Output = Result<CrdtDelta<Self>, CrdtError>> + Send;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDeltaBox {
    User(CrdtDelta<UserDO>),
    Chat(CrdtDelta<ChatDO>),
    Message(CrdtDelta<MessageDO>),
}

impl CrdtDeltaBox {
    async fn apply(&self, core: &SignerMeta, tx: &DatabaseTransaction) -> Result<Self, CrdtError> {
        let out = match self {
            CrdtDeltaBox::User(crdt_delta) => CrdtDeltaBox::User(crdt_delta.apply(core, tx).await?),
            CrdtDeltaBox::Chat(crdt_delta) => CrdtDeltaBox::Chat(crdt_delta.apply(core, tx).await?),
            CrdtDeltaBox::Message(crdt_delta) => {
                CrdtDeltaBox::Message(crdt_delta.apply(core, tx).await?)
            }
        };
        Ok(out)
    }

    pub async fn insert(&self, core: &SignerMeta) -> Result<crdt_event::Model, CrdtError> {
        let r = crdt_event::Entity::find()
            .order_by_desc(crdt_event::Column::Clock)
            .one(&core.conn)
            .await?;
        let clock = match r {
            Some(r) => r.clock + 1,
            None => 1,
        };

        // 查询当前 peer 最后应用的事件 clock 作为依赖
        // 注意:这里我们查找的是 revert 不为 NULL 的事件,即已成功应用的事件
        let last_applied_event = crdt_event::Entity::find()
            .filter(crdt_event::Column::Peer.eq(&core.keys.pub_key))
            .filter(crdt_event::Column::Revert.is_not_null())
            .order_by_desc(crdt_event::Column::Clock)
            .one(&core.conn)
            .await?;

        let deps: Option<serde_json::Value> = match last_applied_event {
            Some(event) => {
                // 如果存在最后应用的事件,则将其 clock 作为依赖
                Some(serde_json::json!([event.clock]))
            }
            None => {
                // 如果没有已应用的事件(例如这是第一个事件),则依赖为空数组
                Some(serde_json::json!([]))
            }
        };

        let payload = serde_json::to_string(&self)?;

        let m = crdt_event::ActiveModel {
            clock: Set(clock),
            peer: Set(core.keys.pub_key.clone()),
            payload: Set(payload),
            revert: Set(None),
            deps: Set(deps), // 设置 deps 字段
        };

        let r = m.insert(&core.conn).await?;
        // tracing::debug!("INSERT CrdtDeltaBox {:?}", self);

        Ok(r)
    }
}

impl crdt_event::Model {
    pub async fn apply(&self, core: &SignerMeta) -> Result<(), crate::crdt::errors::CrdtError> {
        let tx = core.conn.begin().await.map_err(|e| {
            crate::crdt::errors::CrdtError::TransactionStartFailed(format!(
                "start transaction failed: {}",
                e
            ))
        })?;

        // 这里需要匹配 CrdtDeltaBox 的新定义
        let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
        let out_box = delta_box.apply(core, &tx).await?;

        let revert = serde_json::to_string(&out_box)?;

        let mut m = crdt_event::ActiveModel::from(self.clone());
        m.revert = Set(Some(revert));
        m.update(&tx).await?;

        tx.commit().await?;

        Ok(())
    }

    pub async fn revert(&self, core: &SignerMeta) -> Result<(), crate::crdt::errors::CrdtError> {
        let tx = core.conn.begin().await?;

        let revert = match &self.revert {
            None => {
                return Err(crate::crdt::errors::CrdtError::InvalidParams(
                    "revert field is None".to_string(),
                ));
            }
            Some(val) => val,
        };

        // 这里需要匹配 CrdtDeltaBox 的新定义
        let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
        delta_box.apply(core, &tx).await?;

        let mut m = crdt_event::ActiveModel::from(self.clone());
        m.revert = Set(None);
        m.update(&tx).await?;

        tx.commit().await?;

        Ok(())
    }
}

/// 对 CRDT 事件进行协调,确保所有事件都正确应用
/// 使用 Event Graph Walker 算法,基于 deps 字段确定依赖关系
pub async fn reconcile(meta: &SignerMeta) -> Result<(), ReconcileError> {
    use crate::entity::crdt_event;
    use sea_orm::prelude::*; // 移除未使用的 QueryOrder as _
    use std::collections::HashSet; // 移除未使用的 HashMap

    // 1. 获取所有未应用的事件 (revert 为 NULL)
    let unapplied_events: Vec<crdt_event::Model> = crdt_event::Entity::find()
        .filter(crdt_event::Column::Revert.is_null())
        .all(&meta.conn)
        .await?;

    // 2. 获取所有已应用的事件 (revert 不为 NULL),并构建一个集合用于快速查找
    let applied_events: Vec<crdt_event::Model> = crdt_event::Entity::find()
        .filter(crdt_event::Column::Revert.is_not_null())
        .all(&meta.conn)
        .await?;

    let mut applied_clocks: HashSet<i32> = applied_events.iter().map(|e| e.clock).collect();
    // 注:_event_map 在当前实现中未使用,但保留以备将来扩展
    // let _event_map: HashMap<i32, crdt_event::Model> = unapplied_events
    //     .iter()
    //     .chain(applied_events.iter())
    //     .map(|e| (e.clock, e.clone()))
    //     .collect();

    // 3. Event Graph Walker: 循环处理未应用事件,直到没有新的事件可以被应用
    let mut unapplied_events_to_process = unapplied_events;
    loop {
        let mut applied_in_this_round = false;
        let mut remaining_events = Vec::new();

        for event in unapplied_events_to_process {
            // 解析依赖
            let deps_clocks: Vec<i32> = match &event.deps {
                Some(deps_val) => {
                    serde_json::from_value(deps_val.clone()).unwrap_or_else(|_| vec![])
                }
                None => vec![],
            };

            // 检查所有依赖是否都已应用
            let all_deps_satisfied = deps_clocks.iter().all(|&dep_clock| {
                // 如果依赖是 0 或负数,视为空依赖(可能用于表示根事件或特殊依赖)
                if dep_clock <= 0 {
                    true
                } else {
                    applied_clocks.contains(&dep_clock)
                }
            });

            if all_deps_satisfied {
                event.apply(meta).await?;
                applied_clocks.insert(event.clock);
                applied_in_this_round = true;
            } else {
                // 依赖不满足,保留到下一轮处理
                remaining_events.push(event);
            }
        }

        // 更新待处理事件列表
        unapplied_events_to_process = remaining_events;

        // 如果这一轮没有应用任何事件,则退出循环
        if !applied_in_this_round {
            break;
        }
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_reconcile_compiles() {
        // 确保方法签名正确并且能够编译
        // 实际的数据库测试需要在集成测试环境中进行
    }
}