mf_transform/
transform.rs1use std::{sync::Arc};
2
3use mf_model::{node_pool::NodePool, schema::Schema, tree::Tree};
4use mf_model::rpds::VectorSync;
5use crate::TransformResult;
6
7use super::step::{Step, StepResult};
8
9#[derive(Debug, Clone)]
11enum LazyDoc {
12 Original(Arc<NodePool>),
14 Pending { base: Arc<NodePool>, steps: VectorSync<Arc<dyn Step>> },
16 Computed(Arc<NodePool>),
18}
19
20#[derive(Debug, Clone)]
21pub struct Transform {
22 pub base_doc: Arc<NodePool>,
24 lazy_doc: LazyDoc,
26 draft: Option<Tree>,
28 pub steps: VectorSync<Arc<dyn Step>>,
30 pub invert_steps: VectorSync<Arc<dyn Step>>,
32 pub schema: Arc<Schema>,
34 needs_recompute: bool,
36}
37
38impl Transform {
39 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(doc, schema), fields(
40 crate_name = "transform",
41 doc_size = doc.size()
42 )))]
43 pub fn new(
44 doc: Arc<NodePool>,
45 schema: Arc<Schema>,
46 ) -> Transform {
47 Transform {
48 base_doc: doc.clone(),
49 lazy_doc: LazyDoc::Original(doc),
50 draft: None,
51 steps: VectorSync::new_sync(),
52 invert_steps: VectorSync::new_sync(),
53 schema,
54 needs_recompute: false,
55 }
56 }
57
58 pub fn doc(&self) -> Arc<NodePool> {
60 match &self.lazy_doc {
61 LazyDoc::Original(doc) => doc.clone(),
62 LazyDoc::Computed(doc) => doc.clone(),
63 LazyDoc::Pending { base, steps } => {
64 self.compute_doc_state(base.clone(), steps.clone())
66 },
67 }
68 }
69
70 fn get_draft(&mut self) -> TransformResult<&mut Tree> {
72 if self.draft.is_none() {
73 self.draft = Some(self.base_doc.get_inner().as_ref().clone());
75 }
76 self.draft.as_mut().ok_or_else(|| anyhow::anyhow!("草稿状态未初始化"))
77 }
78
79 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, step), fields(
80 crate_name = "transform",
81 step_count = self.steps.len()
82 )))]
83 pub fn step(
84 &mut self,
85 step: Arc<dyn Step>,
86 ) -> TransformResult<()> {
87 let schema = self.schema.clone();
88 let draft = self.get_draft()?;
89 let result: StepResult = step.apply(draft, schema)?;
90
91 match result.failed {
92 Some(message) => Err(anyhow::anyhow!(message)),
93 None => {
94 self.add_step(step);
95 Ok(())
96 },
97 }
98 }
99
100 pub fn doc_changed(&self) -> bool {
102 !self.steps.is_empty()
103 }
104
105 fn add_step(
107 &mut self,
108 step: Arc<dyn Step>,
109 ) {
110 if let Some(invert_step) = step.invert(self.base_doc.get_inner()) {
112 self.invert_steps.push_back_mut(invert_step);
113 }
114
115 self.steps.push_back_mut(step.clone());
116
117 self.lazy_doc = LazyDoc::Pending {
119 base: self.base_doc.clone(),
120 steps: self.steps.clone(),
121 };
122 self.needs_recompute = true;
123 }
124
125 fn compute_doc_state(
127 &self,
128 base: Arc<NodePool>,
129 steps: VectorSync<Arc<dyn Step>>,
130 ) -> Arc<NodePool> {
131 if steps.is_empty() {
132 return base;
133 }
134
135 if let Some(ref draft) = self.draft {
137 NodePool::new(Arc::new(draft.clone()))
138 } else {
139 base
140 }
141 }
142
143 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, steps), fields(
145 crate_name = "transform",
146 batch_size = steps.len(),
147 current_step_count = self.steps.len()
148 )))]
149 pub fn apply_steps_batch(
150 &mut self,
151 steps: Vec<Arc<dyn Step>>,
152 ) -> TransformResult<()> {
153 let schema = self.schema.clone();
154 let base_doc_inner = self.base_doc.get_inner().clone();
155
156 let mut new_invert_steps = Vec::new();
158 for step in &steps {
159 if let Some(invert_step) = step.invert(&base_doc_inner) {
160 new_invert_steps.push(invert_step);
161 }
162 }
163
164 let draft = self.get_draft()?;
165
166 for step in &steps {
168 let result = step.apply(draft, schema.clone())?;
169 if let Some(message) = result.failed {
170 return Err(anyhow::anyhow!(message));
171 }
172 }
173
174 for step in steps {
176 self.steps.push_back_mut(step);
177 }
178 for invert_step in new_invert_steps {
179 self.invert_steps.push_back_mut(invert_step);
180 }
181
182 self.lazy_doc = LazyDoc::Pending {
184 base: self.base_doc.clone(),
185 steps: self.steps.clone(),
186 };
187 self.needs_recompute = true;
188
189 Ok(())
190 }
191
192 pub fn commit(&mut self) -> TransformResult<()> {
196 if self.needs_recompute && self.draft.is_some() {
197 let draft_tree = self
198 .draft
199 .as_ref()
200 .ok_or_else(|| anyhow::anyhow!("尝试提交时草稿状态意外丢失"))?;
201 let new_doc = NodePool::new(Arc::new(draft_tree.clone()));
202 self.base_doc = new_doc.clone();
203 self.lazy_doc = LazyDoc::Computed(new_doc);
204 self.draft = None;
205 self.needs_recompute = false;
207 }
208 Ok(())
209 }
210
211 pub fn rollback(&mut self) {
213 self.lazy_doc = LazyDoc::Original(self.base_doc.clone());
214 self.draft = None;
215 self.steps = VectorSync::new_sync();
216 self.invert_steps = VectorSync::new_sync();
217 self.needs_recompute = false;
218 }
219
220 pub fn clear_history(&mut self) {
222 self.steps = VectorSync::new_sync();
223 self.invert_steps = VectorSync::new_sync();
224 }
225
226 pub fn history_size(&self) -> usize {
228 self.steps.len()
229 }
230}