1use moduforge_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use im::HashMap as ImHashMap;
5use std::fmt::{self, Debug};
6use std::{
7 collections::HashMap,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12 time::Instant,
13};
14
15use crate::{ops::GlobalResourceManager, resource::Resource};
16
17use super::{
18 error::{error, StateResult},
19 plugin::{Plugin},
20 transaction::Transaction,
21};
22
23static VERSION: AtomicU64 = AtomicU64::new(1);
24pub fn get_state_version() -> u64 {
25 VERSION.fetch_add(1, Ordering::SeqCst)
27}
28#[derive(Clone)]
34pub struct State {
35 pub config: Arc<Configuration>,
36 pub fields_instances: ImHashMap<String, Arc<dyn Resource>>,
37 pub node_pool: Arc<NodePool>,
38 pub version: u64,
39}
40impl Debug for State {
41 fn fmt(
42 &self,
43 f: &mut fmt::Formatter<'_>,
44 ) -> fmt::Result {
45 write!(f, "State {{ 字段数量: {} }}", self.fields_instances.len())
46 }
47}
48
49impl State {
50 pub async fn create(state_config: StateConfig) -> StateResult<State> {
55 tracing::info!("正在创建新的state");
56 let schema: Arc<Schema> = match &state_config.schema {
57 Some(schema) => schema.clone(),
58 None => state_config.schema.clone().ok_or_else(|| {
59 error::schema_error("必须提供结构定义".to_string())
60 })?,
61 };
62 let config = Configuration::new(
63 schema,
64 state_config.plugins.clone(),
65 state_config.doc.clone(),
66 state_config.resource_manager.clone(),
67 )?;
68 let mut instance = State::new(Arc::new(config))?;
69 let mut field_values = Vec::new();
70 for plugin in &instance.config.plugins {
71 if let Some(field) = &plugin.spec.state_field {
72 tracing::debug!("正在初始化插件状态: {}", plugin.key);
73 let value = field.init(&state_config, Some(&instance)).await;
74 field_values.push((plugin.key.clone(), value));
75 }
76 }
77 for (name, value) in field_values {
78 instance.set_field(&name, value)?;
79 }
80 tracing::info!("state创建成功");
81 Ok(instance)
82 }
83 pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
87 let doc: Arc<NodePool> = match &config.doc {
88 Some(doc) => doc.clone(),
89 None => {
90 let id = IdGenerator::get_id();
91 let nodes = config
92 .schema
93 .top_node_type
94 .clone()
95 .ok_or_else(|| {
96 error::schema_error("顶级节点不存在".to_string())
97 })?
98 .create_and_fill(
99 Some(id.clone()),
100 None,
101 vec![],
102 None,
103 &config.schema,
104 );
105 NodePool::from(nodes)
106 },
107 };
108
109 Ok(State {
110 fields_instances: ImHashMap::new(),
111 config,
112 node_pool: doc,
113 version: get_state_version(),
114 })
115 }
116 pub fn doc(&self) -> Arc<NodePool> {
117 Arc::clone(&self.node_pool)
118 }
119 pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
121 Arc::clone(&self.config.resource_manager)
122 }
123 pub fn schema(&self) -> Arc<Schema> {
125 Arc::clone(&self.config.schema)
126 }
127 pub fn plugins(&self) -> &Vec<Arc<Plugin>> {
129 &self.config.plugins
130 }
131
132 pub fn sorted_plugins(&self) -> &Vec<Arc<Plugin>> {
135 &self.config.plugins
137 }
138
139 pub async fn apply(
141 &self,
142 transaction: Transaction,
143 ) -> StateResult<TransactionResult> {
144 let start_time = Instant::now();
145 let initial_step_count = transaction.steps.len();
146 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
147 let result = self.apply_transaction(transaction).await?;
149 let duration = start_time.elapsed();
151 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
152 Ok(result)
153 }
154
155 pub async fn filter_transaction(
156 &self,
157 tr: &Transaction,
158 ignore: Option<usize>,
159 ) -> StateResult<bool> {
160 let sorted_plugins = self.sorted_plugins();
162
163 for (i, plugin) in sorted_plugins.iter().enumerate() {
164 if Some(i) != ignore
165 && !plugin.apply_filter_transaction(tr, self).await
166 {
167 return Ok(false);
168 }
169 }
170 Ok(true)
171 }
172
173 pub async fn apply_transaction(
175 &self,
176 root_tr: Transaction,
177 ) -> StateResult<TransactionResult> {
178 tracing::info!("开始应用事务");
179 if !self.filter_transaction(&root_tr, None).await? {
180 tracing::debug!("事务被过滤,返回原始状态");
181 return Ok(TransactionResult {
182 state: self.clone(),
183 transactions: vec![root_tr],
184 });
185 }
186
187 let mut trs = Vec::new();
188 let mut new_state: State = self.apply_inner(&root_tr).await?;
189 trs.push(root_tr.clone());
190 let mut seen: Option<Vec<SeenState>> = None;
191
192 let sorted_plugins = self.sorted_plugins();
194
195 loop {
196 let mut have_new = false;
197 for (i, plugin) in sorted_plugins.iter().enumerate() {
198 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
199 let old_state =
200 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
201 if n < trs.len() {
202 if let Some(mut tr) = plugin
203 .apply_append_transaction(
204 &trs[n..],
205 old_state,
206 &new_state,
207 )
208 .await?
209 {
210 if new_state.filter_transaction(&tr, Some(i)).await? {
211 tr.set_meta("appendedTransaction", root_tr.clone());
212 if seen.is_none() {
213 let mut s: Vec<SeenState> = Vec::new();
214 for j in 0..sorted_plugins.len() {
215 s.push(if j < i {
216 SeenState {
217 state: new_state.clone(),
218 n: trs.len(),
219 }
220 } else {
221 SeenState { state: self.clone(), n: 0 }
222 });
223 }
224 seen = Some(s);
225 }
226 tracing::debug!(
227 "插件 {} 添加了新事务",
228 plugin.spec.key.1
229 );
230 new_state = new_state.apply_inner(&tr).await?;
231 trs.push(tr);
232 have_new = true;
233 }
234 }
235 }
236 if let Some(seen) = &mut seen {
237 seen[i] =
238 SeenState { state: new_state.clone(), n: trs.len() };
239 }
240 }
241
242 if !have_new {
243 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
244 return Ok(TransactionResult {
245 state: new_state,
246 transactions: trs,
247 });
248 }
249 }
250 }
251
252 pub async fn apply_inner(
254 &self,
255 tr: &Transaction,
256 ) -> StateResult<State> {
257 let mut config = self.config.as_ref().clone();
258 config.doc = Some(tr.doc());
259 let mut new_instance = State::new(Arc::new(config))?;
260
261 let sorted_plugins = self.sorted_plugins();
263
264 for plugin in sorted_plugins.iter() {
265 if let Some(field) = &plugin.spec.state_field {
266 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
267 let value = field
268 .apply(tr, old_plugin_state, self, &new_instance)
269 .await;
270 new_instance.set_field(&plugin.key, value)?;
271 }
272 }
273 }
274 Ok(new_instance)
275 }
276
277 #[must_use]
278 pub fn tr(&self) -> Transaction {
279 Transaction::new(self)
280 }
281
282 pub async fn reconfigure(
283 &self,
284 state_config: StateConfig,
285 ) -> StateResult<State> {
286 tracing::info!("正在重新配置状态");
287 let config = Configuration::new(
288 self.schema(),
289 state_config.plugins.clone(),
290 state_config.doc.clone(),
291 state_config.resource_manager.clone(),
292 )?;
293 let mut instance = State::new(Arc::new(config))?;
294 let mut field_values = Vec::new();
295 for plugin in &instance.config.plugins {
296 if let Some(field) = &plugin.spec.state_field {
297 let key = plugin.key.clone();
298 tracing::debug!("正在重新配置插件: {}", key);
299 let value = if self.has_field(&key) {
300 if let Some(old_plugin_state) = self.get_field(&key) {
301 old_plugin_state
302 } else {
303 field.init(&state_config, Some(&instance)).await
304 }
305 } else {
306 field.init(&state_config, Some(&instance)).await
307 };
308 field_values.push((key, value));
309 }
310 }
311 for (name, value) in field_values {
312 instance.set_field(&name, value)?;
313 }
314 tracing::info!("状态重新配置完成");
315 Ok(instance)
316 }
317
318 pub fn get_field(
319 &self,
320 name: &str,
321 ) -> Option<Arc<dyn Resource>> {
322 self.fields_instances.get(name).cloned()
323 }
324 pub fn get<T: Resource>(
325 &self,
326 name: &str,
327 ) -> Option<Arc<T>> {
328 self.fields_instances
329 .get(name)
330 .cloned()
331 .and_then(|state| state.downcast_arc::<T>().cloned())
332 }
333
334 fn set_field(
335 &mut self,
336 name: &str,
337 value: Arc<dyn Resource>,
338 ) -> StateResult<()> {
339 self.fields_instances.insert(name.to_owned(), value);
340 Ok(())
341 }
342
343 pub fn has_field(
344 &self,
345 name: &str,
346 ) -> bool {
347 self.fields_instances.contains_key(name)
348 }
349 pub fn serialize(&self) -> StateResult<StateSerialize> {
351 let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
352 for plugin in self.plugins() {
353 if let Some(state_field) = &plugin.spec.state_field {
354 if let Some(value) = self.get_field(&plugin.key) {
355 if let Some(json) = state_field.serialize(value) {
356 state_fields.insert(plugin.key.clone(), json);
357 }
358 };
359 }
360 }
361 let node_pool_str =
362 serde_json::to_string(&self.doc()).map_err(|e| {
363 error::serialize_error(format!("node pool 序列化失败: {}", e))
364 })?;
365 let state_fields_str =
366 serde_json::to_string(&state_fields).map_err(|e| {
367 error::serialize_error(format!("fields 序列化失败: {}", e))
368 })?;
369 Ok(StateSerialize {
370 state_fields: state_fields_str.as_bytes().to_vec(),
371 node_pool: node_pool_str.as_bytes().to_vec(),
372 })
373 }
374 pub fn deserialize(
376 s: &StateSerialize,
377 configuration: &Configuration,
378 ) -> StateResult<State> {
379 let state_fields: HashMap<String, Vec<u8>> =
380 serde_json::from_slice(&s.state_fields).map_err(|e| {
381 error::deserialize_error(format!(
382 "state fields 反序列化失败{}",
383 e
384 ))
385 })?;
386 let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
387 .map_err(|e| {
388 error::deserialize_error(format!(
389 "node pool 反序列化失败: {}",
390 e
391 ))
392 })?;
393 let mut config = configuration.clone();
394 config.doc = Some(node_pool);
395 let mut state = State::new(Arc::new(config))?;
396
397 let mut map_instances = ImHashMap::new();
398 for plugin in &configuration.plugins {
399 if let Some(state_field) = &plugin.spec.state_field {
400 if let Some(value) = state_fields.get(&plugin.key) {
401 if let Some(p_state) = state_field.deserialize(value) {
402 let key = plugin.key.clone();
403 map_instances.insert(key, p_state);
404 }
405 }
406 }
407 }
408 state.fields_instances = map_instances;
409 Ok(state)
410 }
411}
412
413pub struct StateSerialize {
414 pub state_fields: Vec<u8>,
415 pub node_pool: Vec<u8>,
416}
417
418#[derive(Debug)]
424pub struct StateConfig {
425 pub schema: Option<Arc<Schema>>,
426 pub doc: Option<Arc<NodePool>>,
427 pub stored_marks: Option<Vec<Mark>>,
428 pub plugins: Option<Vec<Arc<Plugin>>>,
429 pub resource_manager: Option<Arc<GlobalResourceManager>>,
430}
431
432pub struct SeenState {
433 state: State,
434 n: usize,
435}
436#[derive(Debug, Clone)]
437pub struct TransactionResult {
438 pub state: State,
439 pub transactions: Vec<Transaction>,
440}
441#[derive(Clone, Debug)]
447pub struct Configuration {
448 plugins: Vec<Arc<Plugin>>,
449 plugins_by_key: HashMap<String, Arc<Plugin>>,
450 pub doc: Option<Arc<NodePool>>,
451 schema: Arc<Schema>,
452 pub resource_manager: Arc<GlobalResourceManager>,
453}
454
455impl Configuration {
456 pub fn new(
457 schema: Arc<Schema>,
458 plugins: Option<Vec<Arc<Plugin>>>,
459 doc: Option<Arc<NodePool>>,
460 resource_manager: Option<Arc<GlobalResourceManager>>,
461 ) -> StateResult<Self> {
462 let mut config = Configuration {
463 doc,
464 plugins: Vec::new(),
465 plugins_by_key: HashMap::new(),
466 schema,
467 resource_manager: resource_manager
468 .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
469 };
470
471 if let Some(plugin_list) = plugins {
472 let mut sorted_plugins = plugin_list;
474 sorted_plugins
475 .sort_by(|a, b| a.spec.priority.cmp(&b.spec.priority));
476
477 for plugin in sorted_plugins {
478 let key = plugin.key.clone();
479 if config.plugins_by_key.contains_key(&key) {
480 return Err(anyhow::anyhow!(format!(
481 "插件请不要重复添加{:?}",
482 key
483 )));
484 }
485 config.plugins.push(plugin.clone());
486 config.plugins_by_key.insert(key, plugin);
487 }
488 }
489 Ok(config)
490 }
491}