1use moduforge_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4
5use im::HashMap as ImHashMap;
6use std::{
7 collections::HashMap,
8 sync::{
9 Arc,
10 atomic::{AtomicU64, Ordering},
11 },
12 time::Instant,
13};
14
15use super::{
16 error::{StateError, StateResult},
17 plugin::{Plugin, PluginState},
18 transaction::Transaction,
19};
20
21static VERSION: AtomicU64 = AtomicU64::new(1);
22pub fn get_state_version() -> u64 {
23 VERSION.fetch_add(1, Ordering::SeqCst)
25}
26#[derive(Clone, Debug)]
32pub struct State {
33 pub config: Arc<Configuration>,
34 pub fields_instances: ImHashMap<String, PluginState>,
35 pub node_pool: Arc<NodePool>,
36 pub version: u64,
37}
38
39impl State {
40 pub async fn create(state_config: StateConfig) -> StateResult<State> {
45 tracing::info!("正在创建新的state");
46 let schema = match &state_config.schema {
47 Some(schema) => schema.clone(),
48 None => state_config.schema.clone().ok_or_else(|| {
49 StateError::SchemaError("Schema is required".to_string())
50 })?,
51 };
52 let config = Configuration::new(
53 schema,
54 state_config.plugins.clone(),
55 state_config.doc.clone(),
56 );
57 let mut instance = State::new(Arc::new(config));
58 let mut field_values = Vec::new();
59 for plugin in &instance.config.plugins {
60 if let Some(field) = &plugin.spec.state {
61 tracing::debug!("正在初始化插件状态: {}", plugin.key);
62 let value = field.init(&state_config, Some(&instance)).await;
63 field_values.push((plugin.key.clone(), value));
64 }
65 }
66 for (name, value) in field_values {
67 instance.set_field(&name, value)?;
68 }
69 tracing::info!("state创建成功");
70 Ok(instance)
71 }
72 pub fn new(config: Arc<Configuration>) -> Self {
76 let doc: Arc<NodePool> = match &config.doc {
77 Some(doc) => doc.clone(),
78 None => {
79 let id = IdGenerator::get_id();
80 let nodes = config
81 .schema
82 .top_node_type
83 .clone()
84 .unwrap()
85 .create_and_fill(
86 Some(id.clone()),
87 None,
88 vec![],
89 None,
90 &config.schema,
91 );
92 NodePool::from(nodes, id).into()
93 },
94 };
95
96 State {
97 fields_instances: ImHashMap::new(),
98 config,
99 node_pool: doc,
100 version: get_state_version(), }
102 }
103 pub fn doc(&self) -> Arc<NodePool> {
104 Arc::clone(&self.node_pool)
105 }
106
107 pub fn schema(&self) -> Arc<Schema> {
108 Arc::clone(&self.config.schema)
109 }
110
111 pub fn plugins(&self) -> &Vec<Arc<Plugin>> {
112 &self.config.plugins
113 }
114
115 pub fn sorted_plugins(&self) -> &Vec<Arc<Plugin>> {
118 &self.config.plugins
120 }
121
122 pub async fn apply(
124 &self,
125 transaction: Transaction,
126 ) -> StateResult<TransactionResult> {
127 let start_time = Instant::now();
128 let initial_step_count = transaction.steps.len();
129 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
130 let result = self.apply_transaction(transaction).await?;
132 let duration = start_time.elapsed();
134 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
135 Ok(result)
136 }
137
138 pub async fn filter_transaction(
139 &self,
140 tr: &Transaction,
141 ignore: Option<usize>,
142 ) -> StateResult<bool> {
143 let sorted_plugins = self.sorted_plugins();
145
146 for (i, plugin) in sorted_plugins.iter().enumerate() {
147 if Some(i) != ignore
148 && !plugin.apply_filter_transaction(tr, self).await
149 {
150 return Ok(false);
151 }
152 }
153 Ok(true)
154 }
155
156 pub async fn apply_transaction(
158 &self,
159 root_tr: Transaction,
160 ) -> StateResult<TransactionResult> {
161 tracing::info!("开始应用事务");
162 if !self.filter_transaction(&root_tr, None).await? {
163 tracing::debug!("事务被过滤,返回原始状态");
164 return Ok(TransactionResult {
165 state: self.clone(),
166 transactions: vec![root_tr],
167 });
168 }
169
170 let mut trs = Vec::new();
171 let mut new_state: State = self.apply_inner(&root_tr).await?;
172 trs.push(root_tr);
173 let mut seen: Option<Vec<SeenState>> = None;
174
175 let sorted_plugins = self.sorted_plugins();
177
178 loop {
179 let mut have_new = false;
180 for (i, plugin) in sorted_plugins.iter().enumerate() {
181 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
182 let old_state =
183 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
184 if n < trs.len() {
185 if let Some(tr) = plugin
186 .apply_append_transaction(
187 &trs[n..],
188 old_state,
189 &new_state,
190 )
191 .await
192 {
193 if new_state.filter_transaction(&tr, Some(i)).await? {
194 if seen.is_none() {
195 let mut s: Vec<SeenState> = Vec::new();
196 for j in 0..sorted_plugins.len() {
197 s.push(if j < i {
198 SeenState {
199 state: new_state.clone(),
200 n: trs.len(),
201 }
202 } else {
203 SeenState { state: self.clone(), n: 0 }
204 });
205 }
206 seen = Some(s);
207 }
208 tracing::debug!(
209 "插件 {} 添加了新事务",
210 plugin.spec.key.1
211 );
212 new_state = new_state.apply_inner(&tr).await?;
213 trs.push(tr);
214 have_new = true;
215 }
216 }
217 }
218 if let Some(seen) = &mut seen {
219 seen[i] =
220 SeenState { state: new_state.clone(), n: trs.len() };
221 }
222 }
223
224 if !have_new {
225 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
226 return Ok(TransactionResult {
227 state: new_state,
228 transactions: trs,
229 });
230 }
231 }
232 }
233
234 pub async fn apply_inner(
236 &self,
237 tr: &Transaction,
238 ) -> StateResult<State> {
239 let mut config = self.config.as_ref().clone();
240 config.doc = Some(tr.doc.clone());
241 let mut new_instance = State::new(Arc::new(config));
242
243 let sorted_plugins = self.sorted_plugins();
245
246 for plugin in sorted_plugins.iter() {
247 if let Some(field) = &plugin.spec.state {
248 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
249 let value = field
250 .apply(tr, old_plugin_state, self, &new_instance)
251 .await;
252 new_instance.set_field(&plugin.key, value)?;
253 }
254 }
255 }
256 Ok(new_instance)
257 }
258
259 #[must_use]
260 pub fn tr(&self) -> Transaction {
261 Transaction::new(self)
262 }
263
264 pub async fn reconfigure(
265 &self,
266 state_config: StateConfig,
267 ) -> StateResult<State> {
268 tracing::info!("正在重新配置状态");
269 let config = Configuration::new(
270 self.schema(),
271 state_config.plugins.clone(),
272 state_config.doc.clone(),
273 );
274 let mut instance = State::new(Arc::new(config));
275 let mut field_values = Vec::new();
276 for plugin in &instance.config.plugins {
277 if let Some(field) = &plugin.spec.state {
278 let key = plugin.key.clone();
279 tracing::debug!("正在重新配置插件: {}", key);
280 let value = if self.has_field(&key) {
281 if let Some(old_plugin_state) = self.get_field(&key) {
282 old_plugin_state
283 } else {
284 field.init(&state_config, Some(&instance)).await
285 }
286 } else {
287 field.init(&state_config, Some(&instance)).await
288 };
289 field_values.push((key, value));
290 }
291 }
292 for (name, value) in field_values {
293 instance.set_field(&name, value)?;
294 }
295 tracing::info!("状态重新配置完成");
296 Ok(instance)
297 }
298
299 pub fn get_field(
300 &self,
301 name: &str,
302 ) -> Option<PluginState> {
303 self.fields_instances.get(name).cloned()
304 }
305
306 pub fn set_field(
307 &mut self,
308 name: &str,
309 value: PluginState,
310 ) -> StateResult<()> {
311 self.fields_instances.insert(name.to_owned(), value);
312 Ok(())
313 }
314
315 pub fn has_field(
316 &self,
317 name: &str,
318 ) -> bool {
319 self.fields_instances.contains_key(name)
320 }
321}
322pub struct StateConfig {
328 pub schema: Option<Arc<Schema>>,
329 pub doc: Option<Arc<NodePool>>,
330 pub stored_marks: Option<Vec<Mark>>,
331 pub plugins: Option<Vec<Arc<Plugin>>>,
332}
333
334pub struct SeenState {
335 state: State,
336 n: usize,
337}
338#[derive(Debug, Clone)]
339pub struct TransactionResult {
340 pub state: State,
341 pub transactions: Vec<Transaction>,
342}
343#[derive(Clone, Debug)]
349pub struct Configuration {
350 plugins: Vec<Arc<Plugin>>,
351 plugins_by_key: HashMap<String, Arc<Plugin>>,
352 pub doc: Option<Arc<NodePool>>,
353 schema: Arc<Schema>,
354}
355
356impl Configuration {
357 pub fn new(
358 schema: Arc<Schema>,
359 plugins: Option<Vec<Arc<Plugin>>>,
360 doc: Option<Arc<NodePool>>,
361 ) -> Self {
362 let mut config = Configuration {
363 doc,
364 plugins: Vec::new(),
365 plugins_by_key: HashMap::new(),
366 schema,
367 };
368
369 if let Some(plugin_list) = plugins {
370 let mut sorted_plugins = plugin_list;
372 sorted_plugins
373 .sort_by(|a, b| a.spec.priority.cmp(&b.spec.priority));
374
375 for plugin in sorted_plugins {
376 let key = plugin.key.clone();
377 if config.plugins_by_key.contains_key(&key) {
378 panic!("插件请不要重复添加 ({})", key);
379 }
380 config.plugins.push(plugin.clone());
381 config.plugins_by_key.insert(key, plugin);
382 }
383 }
384 config
385 }
386}