1use std::any::Any;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use mf_model::mark::Mark;
7use mf_model::node_definition::NodeTree;
8use mf_model::types::NodeId;
9use mf_transform::TransformResult;
10use serde_json::Value;
11
12use super::state::State;
13use mf_model::node_pool::NodePool;
14use mf_transform::attr_step::AttrStep;
15use mf_transform::node_step::{AddNodeStep, RemoveNodeStep};
16use mf_transform::mark_step::{AddMarkStep, RemoveMarkStep};
17use mf_transform::transform::{Transform};
18use std::fmt::Debug;
19use std::sync::atomic::{AtomicU64, Ordering};
20use mf_model::rpds::{HashTrieMapSync};
21
22#[async_trait]
25pub trait Command: Send + Sync + Debug {
26 async fn execute(
27 &self,
28 tr: &mut Transaction,
29 ) -> TransformResult<()>;
30 fn name(&self) -> String;
31}
32static VERSION: AtomicU64 = AtomicU64::new(1);
33pub fn get_tr_id() -> u64 {
34 VERSION.fetch_add(1, Ordering::SeqCst)
36}
37#[derive(Clone)]
39pub struct Transaction {
40 pub meta: HashTrieMapSync<String, Arc<dyn Any + Send + Sync>>,
42 pub id: u64,
43 transform: Transform,
44}
45impl Debug for Transaction {
46 fn fmt(
47 &self,
48 f: &mut std::fmt::Formatter<'_>,
49 ) -> std::fmt::Result {
50 write!(f, "Transaction {{ id: {}}}", self.id)
51 }
52}
53
54impl Deref for Transaction {
55 type Target = Transform;
56
57 fn deref(&self) -> &Self::Target {
58 &self.transform
59 }
60}
61
62impl DerefMut for Transaction {
63 fn deref_mut(&mut self) -> &mut Self::Target {
64 &mut self.transform
65 }
66}
67
68impl Transaction {
69 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(state), fields(
73 crate_name = "state",
74 state_version = state.version,
75 doc_size = state.node_pool.size()
76 )))]
77 pub fn new(state: &State) -> Self {
78 let node = state.doc();
79 let schema = state.schema();
80 let tr = Transaction {
81 meta: HashTrieMapSync::new_sync(),
82 id: get_tr_id(), transform: Transform::new(node, schema),
84 };
85 #[cfg(feature = "dev-tracing")]
86 tracing::debug!(tr_id = %tr.id, "事务创建成功");
87 tr
88 }
89 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, other), fields(
90 crate_name = "state",
91 self_tr_id = %self.id,
92 other_tr_id = %other.id,
93 self_steps = self.steps.len(),
94 other_steps = other.steps.len()
95 )))]
96 pub fn merge(
97 &mut self,
98 other: &mut Self,
99 ) {
100 let steps_to_apply: Vec<_> = other.steps.iter().cloned().collect();
102 if let Err(e) = self.apply_steps_batch(steps_to_apply) {
103 #[cfg(feature = "dev-tracing")]
104 tracing::error!(error = %e, "批量应用步骤失败");
105 eprintln!("批量应用步骤失败: {e}");
106 } else {
107 #[cfg(feature = "dev-tracing")]
108 tracing::debug!(total_steps = self.steps.len(), "事务合并成功");
109 }
110 }
111 pub fn doc(&self) -> Arc<NodePool> {
113 self.transform.doc()
114 }
115 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, values), fields(
119 crate_name = "state",
120 tr_id = %self.id,
121 node_id = %id,
122 attr_count = values.len()
123 )))]
124 pub fn set_node_attribute(
125 &mut self,
126 id: NodeId,
127 values: HashTrieMapSync<String, Value>,
128 ) -> TransformResult<()> {
129 self.step(Arc::new(AttrStep::new(id, values)))?;
130 Ok(())
131 }
132 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, nodes), fields(
136 crate_name = "state",
137 tr_id = %self.id,
138 parent_id = %parent_id,
139 node_count = nodes.len()
140 )))]
141 pub fn add_node(
142 &mut self,
143 parent_id: NodeId,
144 nodes: Vec<NodeTree>,
145 ) -> TransformResult<()> {
146 self.step(Arc::new(AddNodeStep::new(parent_id, nodes)))?;
147 Ok(())
148 }
149 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, node_ids), fields(
153 crate_name = "state",
154 tr_id = %self.id,
155 parent_id = %parent_id,
156 remove_count = node_ids.len()
157 )))]
158 pub fn remove_node(
159 &mut self,
160 parent_id: NodeId,
161 node_ids: Vec<NodeId>,
162 ) -> TransformResult<()> {
163 self.step(Arc::new(RemoveNodeStep::new(parent_id, node_ids)))?;
164 Ok(())
165 }
166 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, marks), fields(
170 crate_name = "state",
171 tr_id = %self.id,
172 node_id = %id,
173 mark_count = marks.len()
174 )))]
175 pub fn add_mark(
176 &mut self,
177 id: NodeId,
178 marks: Vec<Mark>,
179 ) -> TransformResult<()> {
180 self.step(Arc::new(AddMarkStep::new(id, marks)))?;
181 Ok(())
182 }
183 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, mark_types), fields(
187 crate_name = "state",
188 tr_id = %self.id,
189 node_id = %id,
190 mark_type_count = mark_types.len()
191 )))]
192 pub fn remove_mark(
193 &mut self,
194 id: NodeId,
195 mark_types: Vec<String>,
196 ) -> TransformResult<()> {
197 self.step(Arc::new(RemoveMarkStep::new(id, mark_types)))?;
198 Ok(())
199 }
200 pub fn set_meta<K, T: Send + Sync + 'static>(
204 &mut self,
205 key: K,
206 value: T,
207 ) -> &mut Self
208 where
209 K: Into<String>,
210 {
211 let key_str = key.into();
212 self.meta.insert_mut(key_str, Arc::new(value));
213 self
214 }
215 pub fn get_meta<T: Clone + 'static>(
219 &self,
220 key: &str,
221 ) -> Option<T> {
222 let value = self.meta.get(key)?;
223
224 value.downcast_ref::<T>().cloned()
225 }
226}