use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value;
use tracing::{info, warn, error};
use super::state::State;
use moduforge_transform::draft::Draft;
use moduforge_transform::patch::Patch;
use moduforge_model::node::Node;
use moduforge_model::node_pool::{NodePool};
use moduforge_model::schema::Schema;
use moduforge_transform::attr_step::AttrStep;
use moduforge_transform::node_step::AddNodeStep;
use moduforge_transform::step::{Step, StepResult};
use moduforge_transform::transform::{Transform, TransformError};
use moduforge_transform::{ConcreteStep, PatchStep};
use std::fmt::Debug;
static IDS: AtomicU64 = AtomicU64::new(1);
pub fn get_transaction_id() -> u64 {
IDS.fetch_add(1, Ordering::SeqCst)
}
#[async_trait]
pub trait Command: Send + Sync + Debug {
async fn execute(
&self,
tr: &mut Transaction,
) -> Result<(), TransformError>;
fn name(&self) -> String;
}
#[derive(Debug, Clone)]
pub struct Transaction {
pub meta: im::HashMap<String, Arc<dyn std::any::Any>>,
pub id: u64,
pub steps: im::Vector<Arc<dyn Step>>,
pub patches: im::Vector<Vec<Patch>>,
pub doc: Arc<NodePool>,
pub draft: Draft,
pub schema: Arc<Schema>,
}
unsafe impl Send for Transaction {}
unsafe impl Sync for Transaction {}
impl Transform for Transaction {
fn step(
&mut self,
step: Arc<dyn Step>,
) -> Result<(), TransformError> {
let result = step.apply(&mut self.draft, self.schema.clone())?;
match result.failed {
Some(message) => Err(TransformError::new(message)),
None => {
self.add_step(step, result);
Ok(())
},
}
}
fn doc_changed(&self) -> bool {
!self.steps.is_empty()
}
fn add_step(
&mut self,
step: Arc<dyn Step>,
result: StepResult,
) {
self.steps.push_back(step);
self.patches.push_back(result.patches);
self.doc = result.doc.unwrap();
}
}
impl Transaction {
pub async fn transaction(
&mut self,
call_back: Arc<dyn Command>,
) {
info!("开始执行事务: {}", call_back.name());
self.draft.begin = true;
let result = call_back.execute(self).await;
self.draft.begin = false;
match result {
Ok(_) => {
info!("事务执行成功,正在提交更改");
let result = self.draft.commit();
self.add_step(
Arc::new(PatchStep { patches: result.patches.clone() }),
result,
);
},
Err(e) => {
error!("事务执行失败: {}", e);
warn!("事务回滚");
},
}
}
pub fn new(state: &State) -> Self {
let node = state.doc();
Transaction {
meta: im::HashMap::new(),
id: get_transaction_id(),
steps: im::Vector::new(),
doc: node,
schema: state.schema(),
draft: Draft::new(state.doc()),
patches: im::Vector::new(),
}
}
pub fn doc(&self) -> Arc<NodePool> {
self.doc.clone()
}
pub fn as_concrete(step: &Arc<dyn Step>) -> ConcreteStep {
step.to_concrete()
}
pub fn set_node_attribute(
&mut self,
id: String,
values: im::HashMap<String, Value>,
) {
let _ = self.step(Arc::new(AttrStep::new(id, values)));
}
pub fn add_node(
&mut self,
parent_id: String,
nodes: Vec<Node>,
) {
let _ = self.step(Arc::new(AddNodeStep::new(parent_id, nodes)));
}
pub fn set_time(
&mut self,
id: u64,
) -> &mut Self {
self.id = id;
self
}
pub fn set_meta<K, T: std::any::Any>(
&mut self,
key: K,
value: T,
) -> &mut Self
where
K: Into<String>,
{
let key_str = key.into();
self.meta.insert(key_str, Arc::new(value));
self
}
pub fn get_meta<T: 'static, K>(
&self,
key: K,
) -> Option<&T>
where
K: Into<String>,
{
let key_str = key.into();
self.meta.get(&key_str)?.downcast_ref::<T>()
}
}