mf_transform/
transform.rs

1use std::{sync::Arc};
2
3use mf_model::{node_pool::NodePool, schema::Schema, tree::Tree};
4use mf_model::rpds::VectorSync;
5use crate::TransformResult;
6
7use super::step::{Step, StepResult};
8
9/// 延迟计算的文档状态
10#[derive(Debug, Clone)]
11enum LazyDoc {
12    /// 原始文档,未进行任何修改
13    Original(Arc<NodePool>),
14    /// 需要重新计算的状态,包含基础文档和待应用的步骤
15    Pending { base: Arc<NodePool>, steps: VectorSync<Arc<dyn Step>> },
16    /// 已计算的最新状态
17    Computed(Arc<NodePool>),
18}
19
20#[derive(Debug, Clone)]
21pub struct Transform {
22    /// 原始文档状态
23    pub base_doc: Arc<NodePool>,
24    /// 延迟计算的当前文档状态
25    lazy_doc: LazyDoc,
26    /// 文档的草稿状态,用于临时修改 (Copy-on-Write)
27    draft: Option<Tree>,
28    /// 存储所有操作步骤
29    pub steps: VectorSync<Arc<dyn Step>>,
30    /// 存储所有反向操作步骤
31    pub invert_steps: VectorSync<Arc<dyn Step>>,
32    /// 文档的模式定义
33    pub schema: Arc<Schema>,
34    /// 标记是否需要重新计算文档状态
35    needs_recompute: bool,
36}
37
38impl Transform {
39    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(doc, schema), fields(
40        crate_name = "transform",
41        doc_size = doc.size()
42    )))]
43    pub fn new(
44        doc: Arc<NodePool>,
45        schema: Arc<Schema>,
46    ) -> Transform {
47        Transform {
48            base_doc: doc.clone(),
49            lazy_doc: LazyDoc::Original(doc),
50            draft: None,
51            steps: VectorSync::new_sync(),
52            invert_steps: VectorSync::new_sync(),
53            schema,
54            needs_recompute: false,
55        }
56    }
57
58    /// 获取当前文档状态,使用延迟计算
59    pub fn doc(&self) -> Arc<NodePool> {
60        match &self.lazy_doc {
61            LazyDoc::Original(doc) => doc.clone(),
62            LazyDoc::Computed(doc) => doc.clone(),
63            LazyDoc::Pending { base, steps } => {
64                // 延迟计算:只有在需要时才重新计算文档状态
65                self.compute_doc_state(base.clone(), steps.clone())
66            },
67        }
68    }
69
70    /// 获取草稿状态,使用 Copy-on-Write
71    fn get_draft(&mut self) -> TransformResult<&mut Tree> {
72        if self.draft.is_none() {
73            // 只有在第一次修改时才克隆
74            self.draft = Some(self.base_doc.get_inner().as_ref().clone());
75        }
76        self.draft.as_mut().ok_or_else(|| anyhow::anyhow!("草稿状态未初始化"))
77    }
78
79    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, step), fields(
80        crate_name = "transform",
81        step_count = self.steps.len()
82    )))]
83    pub fn step(
84        &mut self,
85        step: Arc<dyn Step>,
86    ) -> TransformResult<()> {
87        let schema = self.schema.clone();
88        let draft = self.get_draft()?;
89        let result: StepResult = step.apply(draft, schema)?;
90
91        match result.failed {
92            Some(message) => Err(anyhow::anyhow!(message)),
93            None => {
94                self.add_step(step);
95                Ok(())
96            },
97        }
98    }
99
100    /// 检查文档是否被修改
101    pub fn doc_changed(&self) -> bool {
102        !self.steps.is_empty()
103    }
104
105    /// 添加一个步骤及其结果到事务中
106    fn add_step(
107        &mut self,
108        step: Arc<dyn Step>,
109    ) {
110        // 生成反向步骤
111        if let Some(invert_step) = step.invert(self.base_doc.get_inner()) {
112            self.invert_steps.push_back_mut(invert_step);
113        }
114
115        self.steps.push_back_mut(step.clone());
116
117        // 标记需要延迟重新计算,而不是立即计算
118        self.lazy_doc = LazyDoc::Pending {
119            base: self.base_doc.clone(),
120            steps: self.steps.clone(),
121        };
122        self.needs_recompute = true;
123    }
124
125    /// 强制重新计算文档状态(私有方法)
126    fn compute_doc_state(
127        &self,
128        base: Arc<NodePool>,
129        steps: VectorSync<Arc<dyn Step>>,
130    ) -> Arc<NodePool> {
131        if steps.is_empty() {
132            return base;
133        }
134
135        // 只有在真正需要时才进行计算
136        if let Some(ref draft) = self.draft {
137            NodePool::new(Arc::new(draft.clone()))
138        } else {
139            base
140        }
141    }
142
143    /// 批量应用步骤(优化版本)
144    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, steps), fields(
145        crate_name = "transform",
146        batch_size = steps.len(),
147        current_step_count = self.steps.len()
148    )))]
149    pub fn apply_steps_batch(
150        &mut self,
151        steps: Vec<Arc<dyn Step>>,
152    ) -> TransformResult<()> {
153        let schema = self.schema.clone();
154        let base_doc_inner = self.base_doc.get_inner().clone();
155
156        // 收集反向步骤
157        let mut new_invert_steps = Vec::new();
158        for step in &steps {
159            if let Some(invert_step) = step.invert(&base_doc_inner) {
160                new_invert_steps.push(invert_step);
161            }
162        }
163
164        let draft = self.get_draft()?;
165
166        // 批量应用,减少中间状态创建
167        for step in &steps {
168            let result = step.apply(draft, schema.clone())?;
169            if let Some(message) = result.failed {
170                return Err(anyhow::anyhow!(message));
171            }
172        }
173
174        // 更新步骤列表
175        for step in steps {
176            self.steps.push_back_mut(step);
177        }
178        for invert_step in new_invert_steps {
179            self.invert_steps.push_back_mut(invert_step);
180        }
181
182        // 只在最后更新状态
183        self.lazy_doc = LazyDoc::Pending {
184            base: self.base_doc.clone(),
185            steps: self.steps.clone(),
186        };
187        self.needs_recompute = true;
188
189        Ok(())
190    }
191
192    /// 提交更改,将当前状态设为新的基础状态
193    /// 保留历史记录(steps 和 invert_steps)以支持回滚功能
194    /// 返回 TransformResult 以处理状态错误
195    pub fn commit(&mut self) -> TransformResult<()> {
196        if self.needs_recompute && self.draft.is_some() {
197            let draft_tree = self
198                .draft
199                .as_ref()
200                .ok_or_else(|| anyhow::anyhow!("尝试提交时草稿状态意外丢失"))?;
201            let new_doc = NodePool::new(Arc::new(draft_tree.clone()));
202            self.base_doc = new_doc.clone();
203            self.lazy_doc = LazyDoc::Computed(new_doc);
204            self.draft = None;
205            // 保留 steps 和 invert_steps 用于历史记录和回滚
206            self.needs_recompute = false;
207        }
208        Ok(())
209    }
210
211    /// 回滚所有未提交的更改
212    pub fn rollback(&mut self) {
213        self.lazy_doc = LazyDoc::Original(self.base_doc.clone());
214        self.draft = None;
215        self.steps = VectorSync::new_sync();
216        self.invert_steps = VectorSync::new_sync();
217        self.needs_recompute = false;
218    }
219
220    /// 清除历史记录(释放内存)
221    pub fn clear_history(&mut self) {
222        self.steps = VectorSync::new_sync();
223        self.invert_steps = VectorSync::new_sync();
224    }
225
226    /// 获取历史记录大小
227    pub fn history_size(&self) -> usize {
228        self.steps.len()
229    }
230}