mf_state/
state.rs

1use mf_model::{
2    id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use imbl::HashMap as ImHashMap;
5use std::fmt::{self, Debug};
6use std::{
7    collections::HashMap,
8    sync::{
9        atomic::{AtomicU64, Ordering},
10        Arc,
11    },
12    time::Instant,
13};
14
15use crate::plugin::PluginManager;
16use crate::{ops::GlobalResourceManager, resource::Resource};
17
18use super::{
19    error::{error, StateResult},
20    plugin::{Plugin},
21    transaction::Transaction,
22};
23
24static VERSION: AtomicU64 = AtomicU64::new(1);
25pub fn get_state_version() -> u64 {
26    //生成 全局自增的版本号,用于兼容性
27    VERSION.fetch_add(1, Ordering::SeqCst)
28}
29/// State 结构体代表编辑器的整体状态
30/// - 配置信息: 存储编辑器的配置信息
31/// - 字段实例: 存储插件的状态数据
32/// - 节点池: 文档的节点池
33/// - 版本号: 状态版本号,用于追踪变更
34#[derive(Clone)]
35pub struct State {
36    pub config: Arc<Configuration>,
37    pub fields_instances: Arc<ImHashMap<String, Arc<dyn Resource>>>,
38    pub node_pool: Arc<NodePool>,
39    pub version: u64,
40}
41impl Debug for State {
42    fn fmt(
43        &self,
44        f: &mut fmt::Formatter<'_>,
45    ) -> fmt::Result {
46        write!(f, "State {{ 字段数量: {} }}", self.fields_instances.len())
47    }
48}
49
50impl State {
51    /// 创建新的编辑器状态
52    /// - 初始化基础配置
53    /// - 初始化所有插件的状态
54    /// - 返回完整的编辑器状态实例
55    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(state_config), fields(
56        crate_name = "state",
57        has_schema = state_config.schema.is_some(),
58        has_doc = state_config.doc.is_some()
59    )))]
60    pub async fn create(state_config: StateConfig) -> StateResult<State> {
61        tracing::info!("正在创建新的state");
62        let schema: Arc<Schema> = match &state_config.schema {
63            Some(schema) => schema.clone(),
64            None => state_config.schema.clone().ok_or_else(|| {
65                error::schema_error("必须提供结构定义".to_string())
66            })?,
67        };
68        let config = Configuration::new(
69            schema,
70            state_config.plugins.clone(),
71            state_config.doc.clone(),
72            state_config.resource_manager.clone(),
73        )
74        .await?;
75        let mut instance = State::new(Arc::new(config))?;
76        let mut field_values = Vec::new();
77        let mut fields_instances = ImHashMap::new();
78        for plugin in instance.config.plugin_manager.get_sorted_plugins().await
79        {
80            if let Some(field) = &plugin.spec.state_field {
81                tracing::debug!("正在初始化插件状态: {}", plugin.key);
82                let value = field.init_erased(&state_config, &instance).await;
83                field_values.push((plugin.key.clone(), value));
84            }
85        }
86        for (name, value) in field_values {
87            fields_instances.insert(name, value);
88        }
89        instance.fields_instances = Arc::new(fields_instances);
90        tracing::info!("state创建成功");
91        Ok(instance)
92    }
93    /// 根据配置创建新的状态实例
94    /// - 如果没有提供文档,则创建一个空的顶层节点
95    /// - 初始化基本状态信息
96    pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
97        let doc: Arc<NodePool> = match &config.doc {
98            Some(doc) => doc.clone(),
99            None => {
100                let id = IdGenerator::get_id();
101                let factory = config.schema.factory();
102                let nodes = factory.create_top_node(
103                    Some(id.clone()),
104                    None,
105                    vec![],
106                    None,
107                )?;
108                NodePool::from(nodes)
109            },
110        };
111
112        Ok(State {
113            fields_instances: Arc::new(ImHashMap::new()),
114            config,
115            node_pool: doc,
116            version: get_state_version(),
117        })
118    }
119    pub fn doc(&self) -> Arc<NodePool> {
120        Arc::clone(&self.node_pool)
121    }
122    /// 获取资源管理器
123    pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
124        Arc::clone(&self.config.resource_manager)
125    }
126    /// 获取结构定义
127    pub fn schema(&self) -> Arc<Schema> {
128        Arc::clone(&self.config.schema)
129    }
130    /// 获取插件列表
131    pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
132        self.config.plugin_manager.get_sorted_plugins().await
133    }
134
135    /// 获取已排序的插件列表
136    /// 按照优先级排序,优先级低的先执行
137    pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
138        // 由于在 Configuration::new 中已经排序,这里直接返回即可
139        self.config.plugin_manager.get_sorted_plugins().await
140    }
141
142    /// 异步应用事务到当前状态
143    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
144        crate_name = "state",
145        tr_id = %transaction.id,
146        step_count = transaction.steps.len(),
147        version = self.version
148    )))]
149    pub async fn apply(
150        self: &Arc<Self>,
151        transaction: Transaction,
152    ) -> StateResult<TransactionResult> {
153        let start_time = Instant::now();
154        let initial_step_count = transaction.steps.len();
155        tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
156        // 应用事务并获取结果
157        let result = self.apply_transaction(Arc::new(transaction)).await?;
158        // 检查是否需要重新应用事务
159        let duration = start_time.elapsed();
160        tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
161        Ok(result)
162    }
163
164    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
165        crate_name = "state",
166        tr_id = %tr.id,
167        ignore_plugin = ?ignore
168    )))]
169    pub async fn filter_transaction(
170        self: &Arc<Self>,
171        tr: &Transaction,
172        ignore: Option<usize>,
173    ) -> StateResult<bool> {
174        // 获取已排序的插件列表
175        let sorted_plugins = self.sorted_plugins().await;
176
177        for (i, plugin) in sorted_plugins.iter().enumerate() {
178            if Some(i) != ignore
179                && !plugin.apply_filter_transaction(tr, self).await
180            {
181                return Ok(false);
182            }
183        }
184        Ok(true)
185    }
186
187    /// 异步应用事务到当前状态
188    /// 返回新的状态实例和应用事务的步骤
189    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, root_tr), fields(
190        crate_name = "state",
191        tr_id = %root_tr.id,
192        step_count = root_tr.steps.len()
193    )))]
194    pub async fn apply_transaction(
195        self: &Arc<Self>,
196        root_tr: Arc<Transaction>,
197    ) -> StateResult<TransactionResult> {
198        tracing::info!("开始应用事务");
199        if !self.filter_transaction(&root_tr, None).await? {
200            tracing::debug!("事务被过滤,返回原始状态");
201            return Ok(TransactionResult {
202                state: self.clone(),
203                transactions: vec![root_tr],
204            });
205        }
206
207        let mut trs = Vec::new();
208        let mut new_state: Arc<State> = self.apply_inner(&root_tr).await?;
209        trs.push(root_tr.clone());
210        let mut seen: Option<Vec<SeenState>> = None;
211
212        // 获取排序后的插件列表
213        let sorted_plugins = self.sorted_plugins().await;
214
215        loop {
216            let mut have_new = false;
217            for (i, plugin) in sorted_plugins.iter().enumerate() {
218                let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
219                let old_state =
220                    seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
221                if n < trs.len() {
222                    if let Some(mut tr) = plugin
223                        .apply_append_transaction(
224                            &trs[n..],
225                            old_state,
226                            &new_state,
227                        )
228                        .await?
229                    {
230                        if new_state.filter_transaction(&tr, Some(i)).await? {
231                            tr.set_meta("rootTr", root_tr.clone());
232                            if seen.is_none() {
233                                let mut s: Vec<SeenState> = Vec::new();
234                                for j in 0..sorted_plugins.len() {
235                                    s.push(if j < i {
236                                        SeenState {
237                                            state: new_state.clone(),
238                                            n: trs.len(),
239                                        }
240                                    } else {
241                                        SeenState { state: self.clone(), n: 0 }
242                                    });
243                                }
244                                seen = Some(s);
245                            }
246                            tracing::debug!(
247                                "插件 {} 添加了新事务",
248                                plugin.spec.tr.metadata().name.clone()
249                            );
250                            new_state = new_state.apply_inner(&tr).await?;
251                            trs.push(Arc::new(tr));
252                            have_new = true;
253                        }
254                    }
255                }
256                if let Some(seen) = &mut seen {
257                    seen[i] =
258                        SeenState { state: new_state.clone(), n: trs.len() };
259                }
260            }
261
262            if !have_new {
263                tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
264                return Ok(TransactionResult {
265                    state: new_state,
266                    transactions: trs,
267                });
268            }
269        }
270    }
271
272    /// 异步应用内部事务
273    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
274        crate_name = "state",
275        tr_id = %tr.id,
276        step_count = tr.steps.len(),
277        current_version = self.version
278    )))]
279    pub async fn apply_inner(
280        self: &Arc<Self>,
281        tr: &Transaction,
282    ) -> StateResult<Arc<State>> {
283        let mut config = self.config.as_ref().clone();
284        config.doc = Some(tr.doc());
285        let mut new_instance = State::new(Arc::new(config))?;
286        let mut fields_instances = ImHashMap::new();
287        // 获取已排序的插件列表
288        let sorted_plugins = self.sorted_plugins().await;
289
290        for plugin in sorted_plugins.iter() {
291            if let Some(field) = &plugin.spec.state_field {
292                if let Some(old_plugin_state) = self.get_field(&plugin.key) {
293                    let value = field
294                        .apply_erased(tr, old_plugin_state, self, &new_instance)
295                        .await;
296                    fields_instances.insert(plugin.key.clone(), value);
297                }
298            }
299        }
300        new_instance.fields_instances = Arc::new(fields_instances);
301        Ok(Arc::new(new_instance))
302    }
303
304    #[must_use]
305    pub fn tr(&self) -> Transaction {
306        Transaction::new(self)
307    }
308
309    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state_config), fields(
310        crate_name = "state",
311        current_version = self.version,
312        has_plugins = state_config.plugins.is_some()
313    )))]
314    pub async fn reconfigure(
315        &self,
316        state_config: StateConfig,
317    ) -> StateResult<State> {
318        tracing::info!("正在重新配置状态");
319        let config = Configuration::new(
320            self.schema(),
321            state_config.plugins.clone(),
322            state_config.doc.clone(),
323            state_config.resource_manager.clone(),
324        )
325        .await?;
326        let mut instance = State::new(Arc::new(config))?;
327        let mut field_values = Vec::new();
328        let mut fields_instances = ImHashMap::new();
329        for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
330        {
331            if let Some(field) = &plugin.spec.state_field {
332                let key = plugin.key.clone();
333                tracing::debug!("正在重新配置插件: {}", key);
334                let value = if self.has_field(&key) {
335                    if let Some(old_plugin_state) = self.get_field(&key) {
336                        old_plugin_state
337                    } else {
338                        field.init_erased(&state_config, &instance).await
339                    }
340                } else {
341                    field.init_erased(&state_config, &instance).await
342                };
343                field_values.push((key, value));
344            }
345        }
346        for (name, value) in field_values {
347            fields_instances.insert(name, value);
348        }
349        instance.fields_instances = Arc::new(fields_instances);
350        tracing::info!("状态重新配置完成");
351        Ok(instance)
352    }
353
354    pub fn get_field(
355        &self,
356        name: &str,
357    ) -> Option<Arc<dyn Resource>> {
358        self.fields_instances.get(name).cloned()
359    }
360    pub fn get<T: Resource>(
361        &self,
362        name: &str,
363    ) -> Option<Arc<T>> {
364        self.fields_instances
365            .get(name)
366            .cloned()
367            .and_then(|state| state.downcast_arc::<T>().cloned())
368    }
369
370    pub fn has_field(
371        &self,
372        name: &str,
373    ) -> bool {
374        self.fields_instances.contains_key(name)
375    }
376    /// 序列化状态
377    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
378        crate_name = "state",
379        version = self.version,
380        doc_size = self.node_pool.size()
381    )))]
382    pub async fn serialize(&self) -> StateResult<StateSerialize> {
383        let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
384        for plugin in self.plugins().await {
385            if let Some(state_field) = &plugin.spec.state_field {
386                if let Some(value) = self.get_field(&plugin.key) {
387                    if let Some(json) = state_field.serialize_erased(value) {
388                        state_fields.insert(plugin.key.clone(), json);
389                    }
390                };
391            }
392        }
393        let node_pool_str =
394            serde_json::to_string(&self.doc()).map_err(|e| {
395                error::serialize_error(format!("node pool 序列化失败: {e}"))
396            })?;
397        Ok(StateSerialize {
398            state_fields: state_fields,
399            node_pool: node_pool_str.as_bytes().to_vec(),
400        })
401    }
402    /// 反序列化状态
403    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(s, configuration), fields(
404        crate_name = "state",
405        state_fields_size = s.state_fields.len(),
406        node_pool_size = s.node_pool.len()
407    )))]
408    pub async fn deserialize(
409        s: &StateSerialize,
410        configuration: &Configuration,
411    ) -> StateResult<State> {
412        let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
413            .map_err(|e| {
414                error::deserialize_error(format!("node pool 反序列化失败: {e}"))
415            })?;
416        let mut config = configuration.clone();
417        config.doc = Some(node_pool);
418        let mut state = State::new(Arc::new(config))?;
419
420        let mut map_instances = ImHashMap::new();
421        for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
422            if let Some(state_field) = &plugin.spec.state_field {
423                if let Some(value) = s.state_fields.get(&plugin.key) {
424                    if let Some(p_state) = state_field.deserialize_erased(value)
425                    {
426                        let key = plugin.key.clone();
427                        map_instances.insert(key, p_state);
428                    }
429                }
430            }
431        }
432        state.fields_instances = Arc::new(map_instances);
433        Ok(state)
434    }
435}
436
437pub struct StateSerialize {
438    pub state_fields: HashMap<String, Vec<u8>>,
439    pub node_pool: Vec<u8>,
440}
441
442/// 状态配置结构体,用于初始化编辑器状态
443/// - 结构定义: 文档结构定义
444/// - 文档内容: 初始文档内容
445/// - 存储标记: 存储的标记
446/// - 插件列表: 插件列表
447#[derive(Debug)]
448pub struct StateConfig {
449    pub schema: Option<Arc<Schema>>,
450    pub doc: Option<Arc<NodePool>>,
451    pub stored_marks: Option<Vec<Mark>>,
452    pub plugins: Option<Vec<Arc<Plugin>>>,
453    pub resource_manager: Option<Arc<GlobalResourceManager>>,
454}
455
456pub struct SeenState {
457    state: Arc<State>,
458    n: usize,
459}
460#[derive(Debug, Clone)]
461pub struct TransactionResult {
462    pub state: Arc<State>,
463    pub transactions: Vec<Arc<Transaction>>,
464}
465/// 配置结构体,存储编辑器的核心配置信息
466/// - 插件列表: 已加载的插件列表
467/// - 插件索引: 插件索引,用于快速查找
468/// - 文档实例: 文档实例
469/// - 结构定义: 文档结构定义
470#[derive(Clone, Debug)]
471pub struct Configuration {
472    pub plugin_manager: PluginManager,
473    pub doc: Option<Arc<NodePool>>,
474    schema: Arc<Schema>,
475    pub resource_manager: Arc<GlobalResourceManager>,
476}
477
478impl Configuration {
479    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(schema, plugins, doc, resource_manager), fields(
480        crate_name = "state",
481        plugin_count = plugins.as_ref().map(|p| p.len()).unwrap_or(0),
482        has_doc = doc.is_some()
483    )))]
484    pub async fn new(
485        schema: Arc<Schema>,
486        plugins: Option<Vec<Arc<Plugin>>>,
487        doc: Option<Arc<NodePool>>,
488        resource_manager: Option<Arc<GlobalResourceManager>>,
489    ) -> StateResult<Self> {
490        // 使用 Builder 模式构建插件管理器
491        let plugin_manager = if let Some(plugin_list) = plugins {
492            use crate::plugin::PluginManagerBuilder;
493
494            let mut builder = PluginManagerBuilder::new();
495            for plugin in plugin_list {
496                builder.register_plugin(plugin)?;
497            }
498            builder.build()?
499        } else {
500            PluginManager::new()
501        };
502
503        Ok(Configuration {
504            doc,
505            plugin_manager,
506            schema,
507            resource_manager: resource_manager
508                .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
509        })
510    }
511}