1use moduforge_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4
5use im::HashMap as ImHashMap;
6use std::{
7 collections::HashMap,
8 sync::{
9 Arc,
10 atomic::{AtomicU64, Ordering},
11 },
12 time::Instant,
13};
14
15use super::{
16 error::{StateError, StateResult},
17 plugin::{Plugin, PluginState},
18 transaction::Transaction,
19};
20
21static VERSION: AtomicU64 = AtomicU64::new(1);
22pub fn get_state_version() -> u64 {
23 VERSION.fetch_add(1, Ordering::SeqCst)
25}
26#[derive(Clone, Debug)]
32pub struct State {
33 pub config: Arc<Configuration>,
34 pub fields_instances: ImHashMap<String, PluginState>,
35 pub node_pool: Arc<NodePool>,
36 pub version: u64,
37}
38
39impl State {
40 pub async fn create(state_config: StateConfig) -> StateResult<State> {
45 tracing::info!("正在创建新的state");
46 let schema = match &state_config.schema {
47 Some(schema) => schema.clone(),
48 None => state_config.schema.clone().ok_or_else(|| {
49 StateError::SchemaError("Schema is required".to_string())
50 })?,
51 };
52 let config = Configuration::new(
53 schema,
54 state_config.plugins.clone(),
55 state_config.doc.clone(),
56 );
57 let mut instance = State::new(Arc::new(config));
58 let mut field_values = Vec::new();
59 for plugin in &instance.config.plugins {
60 if let Some(field) = &plugin.spec.state {
61 tracing::debug!("正在初始化插件状态: {}", plugin.key);
62 let value = field.init(&state_config, Some(&instance)).await;
63 field_values.push((plugin.key.clone(), value));
64 }
65 }
66 for (name, value) in field_values {
67 instance.set_field(&name, value)?;
68 }
69 tracing::info!("state创建成功");
70 Ok(instance)
71 }
72 pub fn new(config: Arc<Configuration>) -> Self {
76 let doc: Arc<NodePool> = match &config.doc {
77 Some(doc) => doc.clone(),
78 None => {
79 let id = IdGenerator::get_id();
80 let nodes = config
81 .schema
82 .top_node_type
83 .clone()
84 .unwrap()
85 .create_and_fill(
86 Some(id.clone()),
87 None,
88 vec![],
89 None,
90 &config.schema,
91 );
92 NodePool::from(nodes, id).into()
93 },
94 };
95
96 State {
97 fields_instances: ImHashMap::new(),
98 config,
99 node_pool: doc,
100 version: get_state_version(), }
102 }
103 pub fn doc(&self) -> Arc<NodePool> {
104 Arc::clone(&self.node_pool)
105 }
106
107 pub fn schema(&self) -> Arc<Schema> {
108 Arc::clone(&self.config.schema)
109 }
110
111 pub fn plugins(&self) -> &Vec<Arc<Plugin>> {
112 &self.config.plugins
113 }
114
115 pub fn sorted_plugins(&self) -> &Vec<Arc<Plugin>> {
118 &self.config.plugins
120 }
121
122 pub async fn apply(
124 &self,
125 transaction: Transaction,
126 ) -> StateResult<TransactionResult> {
127 let start_time = Instant::now();
128 let initial_step_count = transaction.steps.len();
129 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
130 let result = self.apply_transaction(transaction).await?;
132 let duration = start_time.elapsed();
134 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
135 Ok(result)
136 }
137
138 pub async fn filter_transaction(
139 &self,
140 tr: &Transaction,
141 ignore: Option<usize>,
142 ) -> StateResult<bool> {
143 let sorted_plugins = self.sorted_plugins();
145
146 for (i, plugin) in sorted_plugins.iter().enumerate() {
147 if Some(i) != ignore
148 && !plugin.apply_filter_transaction(tr, self).await
149 {
150 return Ok(false);
151 }
152 }
153 Ok(true)
154 }
155
156 pub async fn apply_transaction(
158 &self,
159 root_tr: Transaction,
160 ) -> StateResult<TransactionResult> {
161 tracing::info!("开始应用事务");
162 if !self.filter_transaction(&root_tr, None).await? {
163 tracing::debug!("事务被过滤,返回原始状态");
164 return Ok(TransactionResult {
165 state: self.clone(),
166 transactions: vec![root_tr],
167 });
168 }
169
170 let mut trs = Vec::new();
171 let mut new_state: State = self.apply_inner(&root_tr).await?;
172 trs.push(root_tr.clone());
173 let mut seen: Option<Vec<SeenState>> = None;
174
175 let sorted_plugins = self.sorted_plugins();
177
178 loop {
179 let mut have_new = false;
180 for (i, plugin) in sorted_plugins.iter().enumerate() {
181 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
182 let old_state =
183 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
184 if n < trs.len() {
185 if let Some(mut tr) = plugin
186 .apply_append_transaction(
187 &trs[n..],
188 old_state,
189 &new_state,
190 )
191 .await
192 {
193 if new_state.filter_transaction(&tr, Some(i)).await? {
194 tr.set_meta("appendedTransaction", root_tr.clone());
195 if seen.is_none() {
196 let mut s: Vec<SeenState> = Vec::new();
197 for j in 0..sorted_plugins.len() {
198 s.push(if j < i {
199 SeenState {
200 state: new_state.clone(),
201 n: trs.len(),
202 }
203 } else {
204 SeenState { state: self.clone(), n: 0 }
205 });
206 }
207 seen = Some(s);
208 }
209 tracing::debug!(
210 "插件 {} 添加了新事务",
211 plugin.spec.key.1
212 );
213 new_state = new_state.apply_inner(&tr).await?;
214 trs.push(tr);
215 have_new = true;
216 }
217 }
218 }
219 if let Some(seen) = &mut seen {
220 seen[i] =
221 SeenState { state: new_state.clone(), n: trs.len() };
222 }
223 }
224
225 if !have_new {
226 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
227 return Ok(TransactionResult {
228 state: new_state,
229 transactions: trs,
230 });
231 }
232 }
233 }
234
235 pub async fn apply_inner(
237 &self,
238 tr: &Transaction,
239 ) -> StateResult<State> {
240 let mut config = self.config.as_ref().clone();
241 config.doc = Some(tr.doc.clone());
242 let mut new_instance = State::new(Arc::new(config));
243
244 let sorted_plugins = self.sorted_plugins();
246
247 for plugin in sorted_plugins.iter() {
248 if let Some(field) = &plugin.spec.state {
249 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
250 let value = field
251 .apply(tr, old_plugin_state, self, &new_instance)
252 .await;
253 new_instance.set_field(&plugin.key, value)?;
254 }
255 }
256 }
257 Ok(new_instance)
258 }
259
260 #[must_use]
261 pub fn tr(&self) -> Transaction {
262 Transaction::new(self)
263 }
264
265 pub async fn reconfigure(
266 &self,
267 state_config: StateConfig,
268 ) -> StateResult<State> {
269 tracing::info!("正在重新配置状态");
270 let config = Configuration::new(
271 self.schema(),
272 state_config.plugins.clone(),
273 state_config.doc.clone(),
274 );
275 let mut instance = State::new(Arc::new(config));
276 let mut field_values = Vec::new();
277 for plugin in &instance.config.plugins {
278 if let Some(field) = &plugin.spec.state {
279 let key = plugin.key.clone();
280 tracing::debug!("正在重新配置插件: {}", key);
281 let value = if self.has_field(&key) {
282 if let Some(old_plugin_state) = self.get_field(&key) {
283 old_plugin_state
284 } else {
285 field.init(&state_config, Some(&instance)).await
286 }
287 } else {
288 field.init(&state_config, Some(&instance)).await
289 };
290 field_values.push((key, value));
291 }
292 }
293 for (name, value) in field_values {
294 instance.set_field(&name, value)?;
295 }
296 tracing::info!("状态重新配置完成");
297 Ok(instance)
298 }
299
300 pub fn get_field(
301 &self,
302 name: &str,
303 ) -> Option<PluginState> {
304 self.fields_instances.get(name).cloned()
305 }
306
307 pub fn set_field(
308 &mut self,
309 name: &str,
310 value: PluginState,
311 ) -> StateResult<()> {
312 self.fields_instances.insert(name.to_owned(), value);
313 Ok(())
314 }
315
316 pub fn has_field(
317 &self,
318 name: &str,
319 ) -> bool {
320 self.fields_instances.contains_key(name)
321 }
322}
323pub struct StateConfig {
329 pub schema: Option<Arc<Schema>>,
330 pub doc: Option<Arc<NodePool>>,
331 pub stored_marks: Option<Vec<Mark>>,
332 pub plugins: Option<Vec<Arc<Plugin>>>,
333}
334
335pub struct SeenState {
336 state: State,
337 n: usize,
338}
339#[derive(Debug, Clone)]
340pub struct TransactionResult {
341 pub state: State,
342 pub transactions: Vec<Transaction>,
343}
344#[derive(Clone, Debug)]
350pub struct Configuration {
351 plugins: Vec<Arc<Plugin>>,
352 plugins_by_key: HashMap<String, Arc<Plugin>>,
353 pub doc: Option<Arc<NodePool>>,
354 schema: Arc<Schema>,
355}
356
357impl Configuration {
358 pub fn new(
359 schema: Arc<Schema>,
360 plugins: Option<Vec<Arc<Plugin>>>,
361 doc: Option<Arc<NodePool>>,
362 ) -> Self {
363 let mut config = Configuration {
364 doc,
365 plugins: Vec::new(),
366 plugins_by_key: HashMap::new(),
367 schema,
368 };
369
370 if let Some(plugin_list) = plugins {
371 let mut sorted_plugins = plugin_list;
373 sorted_plugins
374 .sort_by(|a, b| a.spec.priority.cmp(&b.spec.priority));
375
376 for plugin in sorted_plugins {
377 let key = plugin.key.clone();
378 if config.plugins_by_key.contains_key(&key) {
379 panic!("插件请不要重复添加 ({})", key);
380 }
381 config.plugins.push(plugin.clone());
382 config.plugins_by_key.insert(key, plugin);
383 }
384 }
385 config
386 }
387}