1use mf_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use imbl::HashMap as ImHashMap;
5use std::fmt::{self, Debug};
6use std::{
7 collections::HashMap,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12 time::Instant,
13};
14
15use crate::plugin::PluginManager;
16use crate::{ops::GlobalResourceManager, resource::Resource};
17
18use super::{
19 error::{error, StateResult},
20 plugin::{Plugin},
21 transaction::Transaction,
22};
23
24static VERSION: AtomicU64 = AtomicU64::new(1);
25pub fn get_state_version() -> u64 {
26 VERSION.fetch_add(1, Ordering::SeqCst)
28}
29#[derive(Clone)]
35pub struct State {
36 pub config: Arc<Configuration>,
37 pub fields_instances: ImHashMap<String, Arc<dyn Resource>>,
38 pub node_pool: Arc<NodePool>,
39 pub version: u64,
40}
41impl Debug for State {
42 fn fmt(
43 &self,
44 f: &mut fmt::Formatter<'_>,
45 ) -> fmt::Result {
46 write!(f, "State {{ 字段数量: {} }}", self.fields_instances.len())
47 }
48}
49
50impl State {
51 pub async fn create(state_config: StateConfig) -> StateResult<State> {
56 tracing::info!("正在创建新的state");
57 let schema: Arc<Schema> = match &state_config.schema {
58 Some(schema) => schema.clone(),
59 None => state_config.schema.clone().ok_or_else(|| {
60 error::schema_error("必须提供结构定义".to_string())
61 })?,
62 };
63 let config = Configuration::new(
64 schema,
65 state_config.plugins.clone(),
66 state_config.doc.clone(),
67 state_config.resource_manager.clone(),
68 )
69 .await?;
70 let mut instance = State::new(Arc::new(config))?;
71 let mut field_values = Vec::new();
72 for plugin in instance.config.plugin_manager.get_sorted_plugins().await
73 {
74 if let Some(field) = &plugin.spec.state_field {
75 tracing::debug!("正在初始化插件状态: {}", plugin.key);
76 let value = field.init(&state_config, &instance).await;
77 field_values.push((plugin.key.clone(), value));
78 }
79 }
80 for (name, value) in field_values {
81 instance.set_field(&name, value)?;
82 }
83 tracing::info!("state创建成功");
84 Ok(instance)
85 }
86 pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
90 let doc: Arc<NodePool> = match &config.doc {
91 Some(doc) => doc.clone(),
92 None => {
93 let id = IdGenerator::get_id();
94 let nodes = config
95 .schema
96 .top_node_type
97 .clone()
98 .ok_or_else(|| {
99 error::schema_error("顶级节点不存在".to_string())
100 })?
101 .create_and_fill(
102 Some(id.clone()),
103 None,
104 vec![],
105 None,
106 &config.schema,
107 );
108 NodePool::from(nodes)
109 },
110 };
111
112 Ok(State {
113 fields_instances: ImHashMap::new(),
114 config,
115 node_pool: doc,
116 version: get_state_version(),
117 })
118 }
119 pub fn doc(&self) -> Arc<NodePool> {
120 Arc::clone(&self.node_pool)
121 }
122 pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
124 Arc::clone(&self.config.resource_manager)
125 }
126 pub fn schema(&self) -> Arc<Schema> {
128 Arc::clone(&self.config.schema)
129 }
130 pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
132 self.config.plugin_manager.get_sorted_plugins().await
133 }
134
135 pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
138 self.config.plugin_manager.get_sorted_plugins().await
140 }
141
142 pub async fn apply(
144 &self,
145 transaction: Transaction,
146 ) -> StateResult<TransactionResult> {
147 let start_time = Instant::now();
148 let initial_step_count = transaction.steps.len();
149 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
150 let result = self.apply_transaction(transaction).await?;
152 let duration = start_time.elapsed();
154 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
155 Ok(result)
156 }
157
158 pub async fn filter_transaction(
159 &self,
160 tr: &Transaction,
161 ignore: Option<usize>,
162 ) -> StateResult<bool> {
163 let sorted_plugins = self.sorted_plugins().await;
165
166 for (i, plugin) in sorted_plugins.iter().enumerate() {
167 if Some(i) != ignore
168 && !plugin.apply_filter_transaction(tr, self).await
169 {
170 return Ok(false);
171 }
172 }
173 Ok(true)
174 }
175
176 pub async fn apply_transaction(
178 &self,
179 root_tr: Transaction,
180 ) -> StateResult<TransactionResult> {
181 tracing::info!("开始应用事务");
182 if !self.filter_transaction(&root_tr, None).await? {
183 tracing::debug!("事务被过滤,返回原始状态");
184 return Ok(TransactionResult {
185 state: self.clone(),
186 transactions: vec![root_tr],
187 });
188 }
189
190 let mut trs = Vec::new();
191 let mut new_state: State = self.apply_inner(&root_tr).await?;
192 trs.push(root_tr.clone());
193 let mut seen: Option<Vec<SeenState>> = None;
194
195 let sorted_plugins = self.sorted_plugins().await;
197
198 loop {
199 let mut have_new = false;
200 for (i, plugin) in sorted_plugins.iter().enumerate() {
201 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
202 let old_state =
203 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
204 if n < trs.len() {
205 if let Some(mut tr) = plugin
206 .apply_append_transaction(
207 &trs[n..],
208 old_state,
209 &new_state,
210 )
211 .await?
212 {
213 if new_state.filter_transaction(&tr, Some(i)).await? {
214 tr.set_meta("rootTr", root_tr.clone());
215 if seen.is_none() {
216 let mut s: Vec<SeenState> = Vec::new();
217 for j in 0..sorted_plugins.len() {
218 s.push(if j < i {
219 SeenState {
220 state: new_state.clone(),
221 n: trs.len(),
222 }
223 } else {
224 SeenState { state: self.clone(), n: 0 }
225 });
226 }
227 seen = Some(s);
228 }
229 tracing::debug!(
230 "插件 {} 添加了新事务",
231 plugin.spec.tr.metadata().name.clone()
232 );
233 new_state = new_state.apply_inner(&tr).await?;
234 trs.push(tr);
235 have_new = true;
236 }
237 }
238 }
239 if let Some(seen) = &mut seen {
240 seen[i] =
241 SeenState { state: new_state.clone(), n: trs.len() };
242 }
243 }
244
245 if !have_new {
246 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
247 return Ok(TransactionResult {
248 state: new_state,
249 transactions: trs,
250 });
251 }
252 }
253 }
254
255 pub async fn apply_inner(
257 &self,
258 tr: &Transaction,
259 ) -> StateResult<State> {
260 let mut config = self.config.as_ref().clone();
261 config.doc = Some(tr.doc());
262 let mut new_instance = State::new(Arc::new(config))?;
263
264 let sorted_plugins = self.sorted_plugins().await;
266
267 for plugin in sorted_plugins.iter() {
268 if let Some(field) = &plugin.spec.state_field {
269 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
270 let value = field
271 .apply(tr, old_plugin_state, self, &new_instance)
272 .await;
273 new_instance.set_field(&plugin.key, value)?;
274 }
275 }
276 }
277 Ok(new_instance)
278 }
279
280 #[must_use]
281 pub fn tr(&self) -> Transaction {
282 Transaction::new(self)
283 }
284
285 pub async fn reconfigure(
286 &self,
287 state_config: StateConfig,
288 ) -> StateResult<State> {
289 tracing::info!("正在重新配置状态");
290 let config = Configuration::new(
291 self.schema(),
292 state_config.plugins.clone(),
293 state_config.doc.clone(),
294 state_config.resource_manager.clone(),
295 )
296 .await?;
297 let mut instance = State::new(Arc::new(config))?;
298 let mut field_values = Vec::new();
299 for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
300 {
301 if let Some(field) = &plugin.spec.state_field {
302 let key = plugin.key.clone();
303 tracing::debug!("正在重新配置插件: {}", key);
304 let value = if self.has_field(&key) {
305 if let Some(old_plugin_state) = self.get_field(&key) {
306 old_plugin_state
307 } else {
308 field.init(&state_config, &instance).await
309 }
310 } else {
311 field.init(&state_config, &instance).await
312 };
313 field_values.push((key, value));
314 }
315 }
316 for (name, value) in field_values {
317 instance.set_field(&name, value)?;
318 }
319 tracing::info!("状态重新配置完成");
320 Ok(instance)
321 }
322
323 pub fn get_field(
324 &self,
325 name: &str,
326 ) -> Option<Arc<dyn Resource>> {
327 self.fields_instances.get(name).cloned()
328 }
329 pub fn get<T: Resource>(
330 &self,
331 name: &str,
332 ) -> Option<Arc<T>> {
333 self.fields_instances
334 .get(name)
335 .cloned()
336 .and_then(|state| state.downcast_arc::<T>().cloned())
337 }
338
339 fn set_field(
340 &mut self,
341 name: &str,
342 value: Arc<dyn Resource>,
343 ) -> StateResult<()> {
344 self.fields_instances.insert(name.to_owned(), value);
345 Ok(())
346 }
347
348 pub fn has_field(
349 &self,
350 name: &str,
351 ) -> bool {
352 self.fields_instances.contains_key(name)
353 }
354 pub async fn serialize(&self) -> StateResult<StateSerialize> {
356 let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
357 for plugin in self.plugins().await {
358 if let Some(state_field) = &plugin.spec.state_field {
359 if let Some(value) = self.get_field(&plugin.key) {
360 if let Some(json) = state_field.serialize(value) {
361 state_fields.insert(plugin.key.clone(), json);
362 }
363 };
364 }
365 }
366 let node_pool_str =
367 serde_json::to_string(&self.doc()).map_err(|e| {
368 error::serialize_error(format!("node pool 序列化失败: {}", e))
369 })?;
370 let state_fields_str =
371 serde_json::to_string(&state_fields).map_err(|e| {
372 error::serialize_error(format!("fields 序列化失败: {}", e))
373 })?;
374 Ok(StateSerialize {
375 state_fields: state_fields_str.as_bytes().to_vec(),
376 node_pool: node_pool_str.as_bytes().to_vec(),
377 })
378 }
379 pub async fn deserialize(
381 s: &StateSerialize,
382 configuration: &Configuration,
383 ) -> StateResult<State> {
384 let state_fields: HashMap<String, Vec<u8>> =
385 serde_json::from_slice(&s.state_fields).map_err(|e| {
386 error::deserialize_error(format!(
387 "state fields 反序列化失败{}",
388 e
389 ))
390 })?;
391 let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
392 .map_err(|e| {
393 error::deserialize_error(format!(
394 "node pool 反序列化失败: {}",
395 e
396 ))
397 })?;
398 let mut config = configuration.clone();
399 config.doc = Some(node_pool);
400 let mut state = State::new(Arc::new(config))?;
401
402 let mut map_instances = ImHashMap::new();
403 for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
404 if let Some(state_field) = &plugin.spec.state_field {
405 if let Some(value) = state_fields.get(&plugin.key) {
406 if let Some(p_state) = state_field.deserialize(value) {
407 let key = plugin.key.clone();
408 map_instances.insert(key, p_state);
409 }
410 }
411 }
412 }
413 state.fields_instances = map_instances;
414 Ok(state)
415 }
416}
417
418pub struct StateSerialize {
419 pub state_fields: Vec<u8>,
420 pub node_pool: Vec<u8>,
421}
422
423#[derive(Debug)]
429pub struct StateConfig {
430 pub schema: Option<Arc<Schema>>,
431 pub doc: Option<Arc<NodePool>>,
432 pub stored_marks: Option<Vec<Mark>>,
433 pub plugins: Option<Vec<Arc<Plugin>>>,
434 pub resource_manager: Option<Arc<GlobalResourceManager>>,
435}
436
437pub struct SeenState {
438 state: State,
439 n: usize,
440}
441#[derive(Debug, Clone)]
442pub struct TransactionResult {
443 pub state: State,
444 pub transactions: Vec<Transaction>,
445}
446#[derive(Clone, Debug)]
452pub struct Configuration {
453 pub plugin_manager: PluginManager,
454 pub doc: Option<Arc<NodePool>>,
455 schema: Arc<Schema>,
456 pub resource_manager: Arc<GlobalResourceManager>,
457}
458
459impl Configuration {
460 pub async fn new(
461 schema: Arc<Schema>,
462 plugins: Option<Vec<Arc<Plugin>>>,
463 doc: Option<Arc<NodePool>>,
464 resource_manager: Option<Arc<GlobalResourceManager>>,
465 ) -> StateResult<Self> {
466 let config = Configuration {
467 doc,
468 plugin_manager: PluginManager::new(),
469 schema,
470 resource_manager: resource_manager
471 .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
472 };
473
474 if let Some(plugin_list) = plugins {
475 for plugin in plugin_list {
476 config.plugin_manager.register_plugin(plugin).await?
477 }
478 config.plugin_manager.finalize_registration().await?;
479 }
480 Ok(config)
481 }
482}