mf_state/
state.rs

1use mf_model::{
2    id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use std::fmt::{self, Debug};
5use std::{
6    collections::HashMap,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc,
10    },
11    time::Instant,
12};
13use mf_model::rpds::HashTrieMapSync;
14use crate::plugin::PluginManager;
15use crate::{ops::GlobalResourceManager, resource::Resource};
16
17use super::{
18    error::{error, StateResult},
19    plugin::{Plugin},
20    transaction::Transaction,
21};
22
23static VERSION: AtomicU64 = AtomicU64::new(1);
24pub fn get_state_version() -> u64 {
25    //生成 全局自增的版本号,用于兼容性
26    VERSION.fetch_add(1, Ordering::SeqCst)
27}
28/// State 结构体代表编辑器的整体状态
29/// - 配置信息: 存储编辑器的配置信息
30/// - 字段实例: 存储插件的状态数据
31/// - 节点池: 文档的节点池
32/// - 版本号: 状态版本号,用于追踪变更
33#[derive(Clone)]
34pub struct State {
35    pub config: Arc<Configuration>,
36    pub fields_instances: Arc<HashTrieMapSync<String, Arc<dyn Resource>>>,
37    pub node_pool: Arc<NodePool>,
38    pub version: u64,
39}
40impl Debug for State {
41    fn fmt(
42        &self,
43        f: &mut fmt::Formatter<'_>,
44    ) -> fmt::Result {
45        write!(
46            f,
47            "State {{ 字段数量: {} }}",
48            self.fields_instances.keys().len()
49        )
50    }
51}
52
53impl State {
54    /// 创建新的编辑器状态
55    /// - 初始化基础配置
56    /// - 初始化所有插件的状态
57    /// - 返回完整的编辑器状态实例
58    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(state_config), fields(
59        crate_name = "state",
60        has_schema = state_config.schema.is_some(),
61        has_doc = state_config.doc.is_some()
62    )))]
63    pub async fn create(state_config: StateConfig) -> StateResult<State> {
64        tracing::info!("正在创建新的state");
65        let schema: Arc<Schema> = match &state_config.schema {
66            Some(schema) => schema.clone(),
67            None => state_config.schema.clone().ok_or_else(|| {
68                error::schema_error("必须提供结构定义".to_string())
69            })?,
70        };
71        let config = Configuration::new(
72            schema,
73            state_config.plugins.clone(),
74            state_config.doc.clone(),
75            state_config.resource_manager.clone(),
76        )
77        .await?;
78        let mut instance = State::new(Arc::new(config))?;
79        let mut field_values = Vec::new();
80        let mut fields_instances = HashTrieMapSync::new_sync();
81        for plugin in instance.config.plugin_manager.get_sorted_plugins().await
82        {
83            if let Some(field) = &plugin.spec.state_field {
84                tracing::debug!("正在初始化插件状态: {}", plugin.key);
85                let value = field.init_erased(&state_config, &instance).await;
86                field_values.push((plugin.key.clone(), value));
87            }
88        }
89        for (name, value) in field_values {
90            fields_instances.insert_mut(name, value);
91        }
92        instance.fields_instances = Arc::new(fields_instances);
93        tracing::info!("state创建成功");
94        Ok(instance)
95    }
96    /// 根据配置创建新的状态实例
97    /// - 如果没有提供文档,则创建一个空的顶层节点
98    /// - 初始化基本状态信息
99    pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
100        let doc: Arc<NodePool> = match &config.doc {
101            Some(doc) => doc.clone(),
102            None => {
103                let id = IdGenerator::get_id();
104                let factory = config.schema.factory();
105                let nodes = factory.create_top_node(
106                    Some(id.clone()),
107                    None,
108                    vec![],
109                    None,
110                )?;
111                NodePool::from(nodes)
112            },
113        };
114
115        Ok(State {
116            fields_instances: Arc::new(HashTrieMapSync::new_sync()),
117            config,
118            node_pool: doc,
119            version: get_state_version(),
120        })
121    }
122    pub fn doc(&self) -> Arc<NodePool> {
123        Arc::clone(&self.node_pool)
124    }
125    /// 获取资源管理器
126    pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
127        Arc::clone(&self.config.resource_manager)
128    }
129    /// 获取结构定义
130    pub fn schema(&self) -> Arc<Schema> {
131        Arc::clone(&self.config.schema)
132    }
133    /// 获取插件列表
134    pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
135        self.config.plugin_manager.get_sorted_plugins().await
136    }
137
138    /// 获取已排序的插件列表
139    /// 按照优先级排序,优先级低的先执行
140    pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
141        // 由于在 Configuration::new 中已经排序,这里直接返回即可
142        self.config.plugin_manager.get_sorted_plugins().await
143    }
144
145    /// 异步应用事务到当前状态
146    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
147        crate_name = "state",
148        tr_id = %transaction.id,
149        step_count = transaction.steps.len(),
150        version = self.version
151    )))]
152    pub async fn apply(
153        self: &Arc<Self>,
154        transaction: Transaction,
155    ) -> StateResult<TransactionResult> {
156        let start_time = Instant::now();
157        let initial_step_count = transaction.steps.len();
158        tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
159        // 应用事务并获取结果
160        let result = self.apply_transaction(Arc::new(transaction)).await?;
161        // 检查是否需要重新应用事务
162        let duration = start_time.elapsed();
163        tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
164        Ok(result)
165    }
166
167    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
168        crate_name = "state",
169        tr_id = %tr.id,
170        ignore_plugin = ?ignore
171    )))]
172    pub async fn filter_transaction(
173        self: &Arc<Self>,
174        tr: &Transaction,
175        ignore: Option<usize>,
176    ) -> StateResult<bool> {
177        // 获取已排序的插件列表
178        let sorted_plugins = self.sorted_plugins().await;
179
180        for (i, plugin) in sorted_plugins.iter().enumerate() {
181            if Some(i) != ignore
182                && !plugin.apply_filter_transaction(tr, self).await
183            {
184                return Ok(false);
185            }
186        }
187        Ok(true)
188    }
189
190    /// 异步应用事务到当前状态
191    /// 返回新的状态实例和应用事务的步骤
192    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, root_tr), fields(
193        crate_name = "state",
194        tr_id = %root_tr.id,
195        step_count = root_tr.steps.len()
196    )))]
197    pub async fn apply_transaction(
198        self: &Arc<Self>,
199        root_tr: Arc<Transaction>,
200    ) -> StateResult<TransactionResult> {
201        tracing::info!("开始应用事务");
202        if !self.filter_transaction(&root_tr, None).await? {
203            tracing::debug!("事务被过滤,返回原始状态");
204            return Ok(TransactionResult {
205                state: self.clone(),
206                transactions: vec![root_tr],
207            });
208        }
209
210        let mut trs = Vec::new();
211        let mut new_state: Arc<State> = self.apply_inner(&root_tr).await?;
212        trs.push(root_tr.clone());
213        let mut seen: Option<Vec<SeenState>> = None;
214
215        // 获取排序后的插件列表
216        let sorted_plugins = self.sorted_plugins().await;
217
218        loop {
219            let mut have_new = false;
220            for (i, plugin) in sorted_plugins.iter().enumerate() {
221                let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
222                let old_state =
223                    seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
224                if n < trs.len() {
225                    if let Some(mut tr) = plugin
226                        .apply_append_transaction(
227                            &trs[n..],
228                            old_state,
229                            &new_state,
230                        )
231                        .await?
232                    {
233                        if new_state.filter_transaction(&tr, Some(i)).await? {
234                            tr.set_meta("rootTr", root_tr.clone());
235                            if seen.is_none() {
236                                let mut s: Vec<SeenState> = Vec::new();
237                                for j in 0..sorted_plugins.len() {
238                                    s.push(if j < i {
239                                        SeenState {
240                                            state: new_state.clone(),
241                                            n: trs.len(),
242                                        }
243                                    } else {
244                                        SeenState { state: self.clone(), n: 0 }
245                                    });
246                                }
247                                seen = Some(s);
248                            }
249                            tracing::debug!(
250                                "插件 {} 添加了新事务",
251                                plugin.spec.tr.metadata().name.clone()
252                            );
253                            new_state = new_state.apply_inner(&tr).await?;
254                            trs.push(Arc::new(tr));
255                            have_new = true;
256                        }
257                    }
258                }
259                if let Some(seen) = &mut seen {
260                    seen[i] =
261                        SeenState { state: new_state.clone(), n: trs.len() };
262                }
263            }
264
265            if !have_new {
266                tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
267                return Ok(TransactionResult {
268                    state: new_state,
269                    transactions: trs,
270                });
271            }
272        }
273    }
274
275    /// 异步应用内部事务
276    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
277        crate_name = "state",
278        tr_id = %tr.id,
279        step_count = tr.steps.len(),
280        current_version = self.version
281    )))]
282    pub async fn apply_inner(
283        self: &Arc<Self>,
284        tr: &Transaction,
285    ) -> StateResult<Arc<State>> {
286        let mut config = self.config.as_ref().clone();
287        config.doc = Some(tr.doc());
288        let mut new_instance = State::new(Arc::new(config))?;
289        let mut fields_instances = HashTrieMapSync::new_sync();
290        // 获取已排序的插件列表
291        let sorted_plugins = self.sorted_plugins().await;
292
293        for plugin in sorted_plugins.iter() {
294            if let Some(field) = &plugin.spec.state_field {
295                if let Some(old_plugin_state) = self.get_field(&plugin.key) {
296                    let value = field
297                        .apply_erased(tr, old_plugin_state, self, &new_instance)
298                        .await;
299                    fields_instances.insert_mut(plugin.key.clone(), value);
300                }
301            }
302        }
303        new_instance.fields_instances = Arc::new(fields_instances);
304        Ok(Arc::new(new_instance))
305    }
306
307    #[must_use]
308    pub fn tr(&self) -> Transaction {
309        Transaction::new(self)
310    }
311
312    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state_config), fields(
313        crate_name = "state",
314        current_version = self.version,
315        has_plugins = state_config.plugins.is_some()
316    )))]
317    pub async fn reconfigure(
318        &self,
319        state_config: StateConfig,
320    ) -> StateResult<State> {
321        tracing::info!("正在重新配置状态");
322        let config = Configuration::new(
323            self.schema(),
324            state_config.plugins.clone(),
325            state_config.doc.clone(),
326            state_config.resource_manager.clone(),
327        )
328        .await?;
329        let mut instance = State::new(Arc::new(config))?;
330        let mut field_values = Vec::new();
331        let mut fields_instances = HashTrieMapSync::new_sync();
332        for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
333        {
334            if let Some(field) = &plugin.spec.state_field {
335                let key = plugin.key.clone();
336                tracing::debug!("正在重新配置插件: {}", key);
337                let value = if self.has_field(&key) {
338                    if let Some(old_plugin_state) = self.get_field(&key) {
339                        old_plugin_state
340                    } else {
341                        field.init_erased(&state_config, &instance).await
342                    }
343                } else {
344                    field.init_erased(&state_config, &instance).await
345                };
346                field_values.push((key, value));
347            }
348        }
349        for (name, value) in field_values {
350            fields_instances.insert_mut(name, value);
351        }
352        instance.fields_instances = Arc::new(fields_instances);
353        tracing::info!("状态重新配置完成");
354        Ok(instance)
355    }
356
357    pub fn get_field(
358        &self,
359        name: &str,
360    ) -> Option<Arc<dyn Resource>> {
361        self.fields_instances.get(name).cloned()
362    }
363    pub fn get<T: Resource>(
364        &self,
365        name: &str,
366    ) -> Option<Arc<T>> {
367        self.fields_instances
368            .get(name)
369            .cloned()
370            .and_then(|state| state.downcast_arc::<T>().cloned())
371    }
372
373    pub fn has_field(
374        &self,
375        name: &str,
376    ) -> bool {
377        self.fields_instances.contains_key(name)
378    }
379    /// 序列化状态
380    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
381        crate_name = "state",
382        version = self.version,
383        doc_size = self.node_pool.size()
384    )))]
385    pub async fn serialize(&self) -> StateResult<StateSerialize> {
386        let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
387        for plugin in self.plugins().await {
388            if let Some(state_field) = &plugin.spec.state_field {
389                if let Some(value) = self.get_field(&plugin.key) {
390                    if let Some(json) = state_field.serialize_erased(value) {
391                        state_fields.insert(plugin.key.clone(), json);
392                    }
393                };
394            }
395        }
396        let node_pool_str =
397            serde_json::to_string(&self.doc()).map_err(|e| {
398                error::serialize_error(format!("node pool 序列化失败: {e}"))
399            })?;
400        Ok(StateSerialize {
401            state_fields: state_fields,
402            node_pool: node_pool_str.as_bytes().to_vec(),
403        })
404    }
405    /// 反序列化状态
406    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(s, configuration), fields(
407        crate_name = "state",
408        state_fields_size = s.state_fields.len(),
409        node_pool_size = s.node_pool.len()
410    )))]
411    pub async fn deserialize(
412        s: &StateSerialize,
413        configuration: &Configuration,
414    ) -> StateResult<State> {
415        let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
416            .map_err(|e| {
417                error::deserialize_error(format!("node pool 反序列化失败: {e}"))
418            })?;
419        let mut config = configuration.clone();
420        config.doc = Some(node_pool);
421        let mut state = State::new(Arc::new(config))?;
422
423        let mut map_instances = HashTrieMapSync::new_sync();
424        for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
425            if let Some(state_field) = &plugin.spec.state_field {
426                if let Some(value) = s.state_fields.get(&plugin.key) {
427                    if let Some(p_state) = state_field.deserialize_erased(value)
428                    {
429                        let key = plugin.key.clone();
430                        map_instances.insert_mut(key, p_state);
431                    }
432                }
433            }
434        }
435        state.fields_instances = Arc::new(map_instances);
436        Ok(state)
437    }
438}
439
440pub struct StateSerialize {
441    pub state_fields: HashMap<String, Vec<u8>>,
442    pub node_pool: Vec<u8>,
443}
444
445/// 状态配置结构体,用于初始化编辑器状态
446/// - 结构定义: 文档结构定义
447/// - 文档内容: 初始文档内容
448/// - 存储标记: 存储的标记
449/// - 插件列表: 插件列表
450#[derive(Debug)]
451pub struct StateConfig {
452    pub schema: Option<Arc<Schema>>,
453    pub doc: Option<Arc<NodePool>>,
454    pub stored_marks: Option<Vec<Mark>>,
455    pub plugins: Option<Vec<Arc<Plugin>>>,
456    pub resource_manager: Option<Arc<GlobalResourceManager>>,
457}
458
459pub struct SeenState {
460    state: Arc<State>,
461    n: usize,
462}
463#[derive(Debug, Clone)]
464pub struct TransactionResult {
465    pub state: Arc<State>,
466    pub transactions: Vec<Arc<Transaction>>,
467}
468/// 配置结构体,存储编辑器的核心配置信息
469/// - 插件列表: 已加载的插件列表
470/// - 插件索引: 插件索引,用于快速查找
471/// - 文档实例: 文档实例
472/// - 结构定义: 文档结构定义
473#[derive(Clone, Debug)]
474pub struct Configuration {
475    pub plugin_manager: PluginManager,
476    pub doc: Option<Arc<NodePool>>,
477    schema: Arc<Schema>,
478    pub resource_manager: Arc<GlobalResourceManager>,
479}
480
481impl Configuration {
482    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(schema, plugins, doc, resource_manager), fields(
483        crate_name = "state",
484        plugin_count = plugins.as_ref().map(|p| p.len()).unwrap_or(0),
485        has_doc = doc.is_some()
486    )))]
487    pub async fn new(
488        schema: Arc<Schema>,
489        plugins: Option<Vec<Arc<Plugin>>>,
490        doc: Option<Arc<NodePool>>,
491        resource_manager: Option<Arc<GlobalResourceManager>>,
492    ) -> StateResult<Self> {
493        // 使用 Builder 模式构建插件管理器
494        let plugin_manager = if let Some(plugin_list) = plugins {
495            use crate::plugin::PluginManagerBuilder;
496
497            let mut builder = PluginManagerBuilder::new();
498            for plugin in plugin_list {
499                builder.register_plugin(plugin)?;
500            }
501            builder.build()?
502        } else {
503            PluginManager::new()
504        };
505
506        Ok(Configuration {
507            doc,
508            plugin_manager,
509            schema,
510            resource_manager: resource_manager
511                .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
512        })
513    }
514}