1use mf_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use imbl::HashMap as ImHashMap;
5use std::fmt::{self, Debug};
6use std::{
7 collections::HashMap,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12 time::Instant,
13};
14
15use crate::plugin::PluginManager;
16use crate::{ops::GlobalResourceManager, resource::Resource};
17
18use super::{
19 error::{error, StateResult},
20 plugin::{Plugin},
21 transaction::Transaction,
22};
23
24static VERSION: AtomicU64 = AtomicU64::new(1);
25pub fn get_state_version() -> u64 {
26 VERSION.fetch_add(1, Ordering::SeqCst)
28}
29#[derive(Clone)]
35pub struct State {
36 pub config: Arc<Configuration>,
37 pub fields_instances: Arc<ImHashMap<String, Arc<dyn Resource>>>,
38 pub node_pool: Arc<NodePool>,
39 pub version: u64,
40}
41impl Debug for State {
42 fn fmt(
43 &self,
44 f: &mut fmt::Formatter<'_>,
45 ) -> fmt::Result {
46 write!(f, "State {{ 字段数量: {} }}", self.fields_instances.len())
47 }
48}
49
50impl State {
51 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(state_config), fields(
56 crate_name = "state",
57 has_schema = state_config.schema.is_some(),
58 has_doc = state_config.doc.is_some()
59 )))]
60 pub async fn create(state_config: StateConfig) -> StateResult<State> {
61 tracing::info!("正在创建新的state");
62 let schema: Arc<Schema> = match &state_config.schema {
63 Some(schema) => schema.clone(),
64 None => state_config.schema.clone().ok_or_else(|| {
65 error::schema_error("必须提供结构定义".to_string())
66 })?,
67 };
68 let config = Configuration::new(
69 schema,
70 state_config.plugins.clone(),
71 state_config.doc.clone(),
72 state_config.resource_manager.clone(),
73 )
74 .await?;
75 let mut instance = State::new(Arc::new(config))?;
76 let mut field_values = Vec::new();
77 let mut fields_instances = ImHashMap::new();
78 for plugin in instance.config.plugin_manager.get_sorted_plugins().await
79 {
80 if let Some(field) = &plugin.spec.state_field {
81 tracing::debug!("正在初始化插件状态: {}", plugin.key);
82 let value = field.init_erased(&state_config, &instance).await;
83 field_values.push((plugin.key.clone(), value));
84 }
85 }
86 for (name, value) in field_values {
87 fields_instances.insert(name, value);
88 }
89 instance.fields_instances = Arc::new(fields_instances);
90 tracing::info!("state创建成功");
91 Ok(instance)
92 }
93 pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
97 let doc: Arc<NodePool> = match &config.doc {
98 Some(doc) => doc.clone(),
99 None => {
100 let id = IdGenerator::get_id();
101 let factory = config.schema.factory();
102 let nodes = factory.create_top_node(
103 Some(id.clone()),
104 None,
105 vec![],
106 None,
107 )?;
108 NodePool::from(nodes)
109 },
110 };
111
112 Ok(State {
113 fields_instances: Arc::new(ImHashMap::new()),
114 config,
115 node_pool: doc,
116 version: get_state_version(),
117 })
118 }
119 pub fn doc(&self) -> Arc<NodePool> {
120 Arc::clone(&self.node_pool)
121 }
122 pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
124 Arc::clone(&self.config.resource_manager)
125 }
126 pub fn schema(&self) -> Arc<Schema> {
128 Arc::clone(&self.config.schema)
129 }
130 pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
132 self.config.plugin_manager.get_sorted_plugins().await
133 }
134
135 pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
138 self.config.plugin_manager.get_sorted_plugins().await
140 }
141
142 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
144 crate_name = "state",
145 tr_id = %transaction.id,
146 step_count = transaction.steps.len(),
147 version = self.version
148 )))]
149 pub async fn apply(
150 self: &Arc<Self>,
151 transaction: Transaction,
152 ) -> StateResult<TransactionResult> {
153 let start_time = Instant::now();
154 let initial_step_count = transaction.steps.len();
155 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
156 let result = self.apply_transaction(Arc::new(transaction)).await?;
158 let duration = start_time.elapsed();
160 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
161 Ok(result)
162 }
163
164 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
165 crate_name = "state",
166 tr_id = %tr.id,
167 ignore_plugin = ?ignore
168 )))]
169 pub async fn filter_transaction(
170 self: &Arc<Self>,
171 tr: &Transaction,
172 ignore: Option<usize>,
173 ) -> StateResult<bool> {
174 let sorted_plugins = self.sorted_plugins().await;
176
177 for (i, plugin) in sorted_plugins.iter().enumerate() {
178 if Some(i) != ignore
179 && !plugin.apply_filter_transaction(tr, self).await
180 {
181 return Ok(false);
182 }
183 }
184 Ok(true)
185 }
186
187 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, root_tr), fields(
190 crate_name = "state",
191 tr_id = %root_tr.id,
192 step_count = root_tr.steps.len()
193 )))]
194 pub async fn apply_transaction(
195 self: &Arc<Self>,
196 root_tr: Arc<Transaction>,
197 ) -> StateResult<TransactionResult> {
198 tracing::info!("开始应用事务");
199 if !self.filter_transaction(&root_tr, None).await? {
200 tracing::debug!("事务被过滤,返回原始状态");
201 return Ok(TransactionResult {
202 state: self.clone(),
203 transactions: vec![root_tr],
204 });
205 }
206
207 let mut trs = Vec::new();
208 let mut new_state: Arc<State> = self.apply_inner(&root_tr).await?;
209 trs.push(root_tr.clone());
210 let mut seen: Option<Vec<SeenState>> = None;
211
212 let sorted_plugins = self.sorted_plugins().await;
214
215 loop {
216 let mut have_new = false;
217 for (i, plugin) in sorted_plugins.iter().enumerate() {
218 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
219 let old_state =
220 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
221 if n < trs.len() {
222 if let Some(mut tr) = plugin
223 .apply_append_transaction(
224 &trs[n..],
225 old_state,
226 &new_state,
227 )
228 .await?
229 {
230 if new_state.filter_transaction(&tr, Some(i)).await? {
231 tr.set_meta("rootTr", root_tr.clone());
232 if seen.is_none() {
233 let mut s: Vec<SeenState> = Vec::new();
234 for j in 0..sorted_plugins.len() {
235 s.push(if j < i {
236 SeenState {
237 state: new_state.clone(),
238 n: trs.len(),
239 }
240 } else {
241 SeenState { state: self.clone(), n: 0 }
242 });
243 }
244 seen = Some(s);
245 }
246 tracing::debug!(
247 "插件 {} 添加了新事务",
248 plugin.spec.tr.metadata().name.clone()
249 );
250 new_state = new_state.apply_inner(&tr).await?;
251 trs.push(Arc::new(tr));
252 have_new = true;
253 }
254 }
255 }
256 if let Some(seen) = &mut seen {
257 seen[i] =
258 SeenState { state: new_state.clone(), n: trs.len() };
259 }
260 }
261
262 if !have_new {
263 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
264 return Ok(TransactionResult {
265 state: new_state,
266 transactions: trs,
267 });
268 }
269 }
270 }
271
272 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
274 crate_name = "state",
275 tr_id = %tr.id,
276 step_count = tr.steps.len(),
277 current_version = self.version
278 )))]
279 pub async fn apply_inner(
280 self: &Arc<Self>,
281 tr: &Transaction,
282 ) -> StateResult<Arc<State>> {
283 let mut config = self.config.as_ref().clone();
284 config.doc = Some(tr.doc());
285 let mut new_instance = State::new(Arc::new(config))?;
286 let mut fields_instances = ImHashMap::new();
287 let sorted_plugins = self.sorted_plugins().await;
289
290 for plugin in sorted_plugins.iter() {
291 if let Some(field) = &plugin.spec.state_field {
292 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
293 let value = field
294 .apply_erased(tr, old_plugin_state, self, &new_instance)
295 .await;
296 fields_instances.insert(plugin.key.clone(), value);
297 }
298 }
299 }
300 new_instance.fields_instances = Arc::new(fields_instances);
301 Ok(Arc::new(new_instance))
302 }
303
304 #[must_use]
305 pub fn tr(&self) -> Transaction {
306 Transaction::new(self)
307 }
308
309 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state_config), fields(
310 crate_name = "state",
311 current_version = self.version,
312 has_plugins = state_config.plugins.is_some()
313 )))]
314 pub async fn reconfigure(
315 &self,
316 state_config: StateConfig,
317 ) -> StateResult<State> {
318 tracing::info!("正在重新配置状态");
319 let config = Configuration::new(
320 self.schema(),
321 state_config.plugins.clone(),
322 state_config.doc.clone(),
323 state_config.resource_manager.clone(),
324 )
325 .await?;
326 let mut instance = State::new(Arc::new(config))?;
327 let mut field_values = Vec::new();
328 let mut fields_instances = ImHashMap::new();
329 for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
330 {
331 if let Some(field) = &plugin.spec.state_field {
332 let key = plugin.key.clone();
333 tracing::debug!("正在重新配置插件: {}", key);
334 let value = if self.has_field(&key) {
335 if let Some(old_plugin_state) = self.get_field(&key) {
336 old_plugin_state
337 } else {
338 field.init_erased(&state_config, &instance).await
339 }
340 } else {
341 field.init_erased(&state_config, &instance).await
342 };
343 field_values.push((key, value));
344 }
345 }
346 for (name, value) in field_values {
347 fields_instances.insert(name, value);
348 }
349 instance.fields_instances = Arc::new(fields_instances);
350 tracing::info!("状态重新配置完成");
351 Ok(instance)
352 }
353
354 pub fn get_field(
355 &self,
356 name: &str,
357 ) -> Option<Arc<dyn Resource>> {
358 self.fields_instances.get(name).cloned()
359 }
360 pub fn get<T: Resource>(
361 &self,
362 name: &str,
363 ) -> Option<Arc<T>> {
364 self.fields_instances
365 .get(name)
366 .cloned()
367 .and_then(|state| state.downcast_arc::<T>().cloned())
368 }
369
370 pub fn has_field(
371 &self,
372 name: &str,
373 ) -> bool {
374 self.fields_instances.contains_key(name)
375 }
376 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
378 crate_name = "state",
379 version = self.version,
380 doc_size = self.node_pool.size()
381 )))]
382 pub async fn serialize(&self) -> StateResult<StateSerialize> {
383 let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
384 for plugin in self.plugins().await {
385 if let Some(state_field) = &plugin.spec.state_field {
386 if let Some(value) = self.get_field(&plugin.key) {
387 if let Some(json) = state_field.serialize_erased(value) {
388 state_fields.insert(plugin.key.clone(), json);
389 }
390 };
391 }
392 }
393 let node_pool_str =
394 serde_json::to_string(&self.doc()).map_err(|e| {
395 error::serialize_error(format!("node pool 序列化失败: {e}"))
396 })?;
397 Ok(StateSerialize {
398 state_fields: state_fields,
399 node_pool: node_pool_str.as_bytes().to_vec(),
400 })
401 }
402 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(s, configuration), fields(
404 crate_name = "state",
405 state_fields_size = s.state_fields.len(),
406 node_pool_size = s.node_pool.len()
407 )))]
408 pub async fn deserialize(
409 s: &StateSerialize,
410 configuration: &Configuration,
411 ) -> StateResult<State> {
412 let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
413 .map_err(|e| {
414 error::deserialize_error(format!("node pool 反序列化失败: {e}"))
415 })?;
416 let mut config = configuration.clone();
417 config.doc = Some(node_pool);
418 let mut state = State::new(Arc::new(config))?;
419
420 let mut map_instances = ImHashMap::new();
421 for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
422 if let Some(state_field) = &plugin.spec.state_field {
423 if let Some(value) = s.state_fields.get(&plugin.key) {
424 if let Some(p_state) = state_field.deserialize_erased(value)
425 {
426 let key = plugin.key.clone();
427 map_instances.insert(key, p_state);
428 }
429 }
430 }
431 }
432 state.fields_instances = Arc::new(map_instances);
433 Ok(state)
434 }
435}
436
437pub struct StateSerialize {
438 pub state_fields: HashMap<String, Vec<u8>>,
439 pub node_pool: Vec<u8>,
440}
441
442#[derive(Debug)]
448pub struct StateConfig {
449 pub schema: Option<Arc<Schema>>,
450 pub doc: Option<Arc<NodePool>>,
451 pub stored_marks: Option<Vec<Mark>>,
452 pub plugins: Option<Vec<Arc<Plugin>>>,
453 pub resource_manager: Option<Arc<GlobalResourceManager>>,
454}
455
456pub struct SeenState {
457 state: Arc<State>,
458 n: usize,
459}
460#[derive(Debug, Clone)]
461pub struct TransactionResult {
462 pub state: Arc<State>,
463 pub transactions: Vec<Arc<Transaction>>,
464}
465#[derive(Clone, Debug)]
471pub struct Configuration {
472 pub plugin_manager: PluginManager,
473 pub doc: Option<Arc<NodePool>>,
474 schema: Arc<Schema>,
475 pub resource_manager: Arc<GlobalResourceManager>,
476}
477
478impl Configuration {
479 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(schema, plugins, doc, resource_manager), fields(
480 crate_name = "state",
481 plugin_count = plugins.as_ref().map(|p| p.len()).unwrap_or(0),
482 has_doc = doc.is_some()
483 )))]
484 pub async fn new(
485 schema: Arc<Schema>,
486 plugins: Option<Vec<Arc<Plugin>>>,
487 doc: Option<Arc<NodePool>>,
488 resource_manager: Option<Arc<GlobalResourceManager>>,
489 ) -> StateResult<Self> {
490 let plugin_manager = if let Some(plugin_list) = plugins {
492 use crate::plugin::PluginManagerBuilder;
493
494 let mut builder = PluginManagerBuilder::new();
495 for plugin in plugin_list {
496 builder.register_plugin(plugin)?;
497 }
498 builder.build()?
499 } else {
500 PluginManager::new()
501 };
502
503 Ok(Configuration {
504 doc,
505 plugin_manager,
506 schema,
507 resource_manager: resource_manager
508 .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
509 })
510 }
511}