1use mf_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use imbl::HashMap as ImHashMap;
5use std::fmt::{self, Debug};
6use std::{
7 collections::HashMap,
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12 time::Instant,
13};
14
15use crate::plugin::PluginManager;
16use crate::{ops::GlobalResourceManager, resource::Resource};
17
18use super::{
19 error::{error, StateResult},
20 plugin::{Plugin},
21 transaction::Transaction,
22};
23
24static VERSION: AtomicU64 = AtomicU64::new(1);
25pub fn get_state_version() -> u64 {
26 VERSION.fetch_add(1, Ordering::SeqCst)
28}
29#[derive(Clone)]
35pub struct State {
36 pub config: Arc<Configuration>,
37 pub fields_instances: Arc<ImHashMap<String, Arc<dyn Resource>>>,
38 pub node_pool: Arc<NodePool>,
39 pub version: u64,
40}
41impl Debug for State {
42 fn fmt(
43 &self,
44 f: &mut fmt::Formatter<'_>,
45 ) -> fmt::Result {
46 write!(f, "State {{ 字段数量: {} }}", self.fields_instances.len())
47 }
48}
49
50impl State {
51 pub async fn create(state_config: StateConfig) -> StateResult<State> {
56 tracing::info!("正在创建新的state");
57 let schema: Arc<Schema> = match &state_config.schema {
58 Some(schema) => schema.clone(),
59 None => state_config.schema.clone().ok_or_else(|| {
60 error::schema_error("必须提供结构定义".to_string())
61 })?,
62 };
63 let config = Configuration::new(
64 schema,
65 state_config.plugins.clone(),
66 state_config.doc.clone(),
67 state_config.resource_manager.clone(),
68 )
69 .await?;
70 let mut instance = State::new(Arc::new(config))?;
71 let mut field_values = Vec::new();
72 let mut fields_instances = ImHashMap::new();
73 for plugin in instance.config.plugin_manager.get_sorted_plugins().await
74 {
75 if let Some(field) = &plugin.spec.state_field {
76 tracing::debug!("正在初始化插件状态: {}", plugin.key);
77 let value = field.init_erased(&state_config, &instance).await;
78 field_values.push((plugin.key.clone(), value));
79 }
80 }
81 for (name, value) in field_values {
82 fields_instances.insert(name, value);
83 }
84 instance.fields_instances = Arc::new(fields_instances);
85 tracing::info!("state创建成功");
86 Ok(instance)
87 }
88 pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
92 let doc: Arc<NodePool> = match &config.doc {
93 Some(doc) => doc.clone(),
94 None => {
95 let id = IdGenerator::get_id();
96 let nodes = config
97 .schema
98 .top_node_type
99 .clone()
100 .ok_or_else(|| {
101 error::schema_error("顶级节点不存在".to_string())
102 })?
103 .create_and_fill(
104 Some(id.clone()),
105 None,
106 vec![],
107 None,
108 &config.schema,
109 );
110 NodePool::from(nodes)
111 },
112 };
113
114 Ok(State {
115 fields_instances: Arc::new(ImHashMap::new()),
116 config,
117 node_pool: doc,
118 version: get_state_version(),
119 })
120 }
121 pub fn doc(&self) -> Arc<NodePool> {
122 Arc::clone(&self.node_pool)
123 }
124 pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
126 Arc::clone(&self.config.resource_manager)
127 }
128 pub fn schema(&self) -> Arc<Schema> {
130 Arc::clone(&self.config.schema)
131 }
132 pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
134 self.config.plugin_manager.get_sorted_plugins().await
135 }
136
137 pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
140 self.config.plugin_manager.get_sorted_plugins().await
142 }
143
144 pub async fn apply(
146 self: &Arc<Self>,
147 transaction: Transaction,
148 ) -> StateResult<TransactionResult> {
149 let start_time = Instant::now();
150 let initial_step_count = transaction.steps.len();
151 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
152 let result = self.apply_transaction(Arc::new(transaction)).await?;
154 let duration = start_time.elapsed();
156 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
157 Ok(result)
158 }
159
160 pub async fn filter_transaction(
161 self: &Arc<Self>,
162 tr: &Transaction,
163 ignore: Option<usize>,
164 ) -> StateResult<bool> {
165 let sorted_plugins = self.sorted_plugins().await;
167
168 for (i, plugin) in sorted_plugins.iter().enumerate() {
169 if Some(i) != ignore
170 && !plugin.apply_filter_transaction(tr, self).await
171 {
172 return Ok(false);
173 }
174 }
175 Ok(true)
176 }
177
178 pub async fn apply_transaction(
181 self: &Arc<Self>,
182 root_tr: Arc<Transaction>,
183 ) -> StateResult<TransactionResult> {
184 tracing::info!("开始应用事务");
185 if !self.filter_transaction(&root_tr, None).await? {
186 tracing::debug!("事务被过滤,返回原始状态");
187 return Ok(TransactionResult {
188 state: self.clone(),
189 transactions: vec![root_tr],
190 });
191 }
192
193 let mut trs = Vec::new();
194 let mut new_state: Arc<State> = self.apply_inner(&root_tr).await?;
195 trs.push(root_tr.clone());
196 let mut seen: Option<Vec<SeenState>> = None;
197
198 let sorted_plugins = self.sorted_plugins().await;
200
201 loop {
202 let mut have_new = false;
203 for (i, plugin) in sorted_plugins.iter().enumerate() {
204 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
205 let old_state =
206 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
207 if n < trs.len() {
208 if let Some(mut tr) = plugin
209 .apply_append_transaction(
210 &trs[n..],
211 old_state,
212 &new_state,
213 )
214 .await?
215 {
216 if new_state.filter_transaction(&tr, Some(i)).await? {
217 tr.set_meta("rootTr", root_tr.clone());
218 if seen.is_none() {
219 let mut s: Vec<SeenState> = Vec::new();
220 for j in 0..sorted_plugins.len() {
221 s.push(if j < i {
222 SeenState {
223 state: new_state.clone(),
224 n: trs.len(),
225 }
226 } else {
227 SeenState { state: self.clone(), n: 0 }
228 });
229 }
230 seen = Some(s);
231 }
232 tracing::debug!(
233 "插件 {} 添加了新事务",
234 plugin.spec.tr.metadata().name.clone()
235 );
236 new_state = new_state.apply_inner(&tr).await?;
237 trs.push(Arc::new(tr));
238 have_new = true;
239 }
240 }
241 }
242 if let Some(seen) = &mut seen {
243 seen[i] =
244 SeenState { state: new_state.clone(), n: trs.len() };
245 }
246 }
247
248 if !have_new {
249 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
250 return Ok(TransactionResult {
251 state: new_state,
252 transactions: trs,
253 });
254 }
255 }
256 }
257
258 pub async fn apply_inner(
260 self: &Arc<Self>,
261 tr: &Transaction,
262 ) -> StateResult<Arc<State>> {
263 let mut config = self.config.as_ref().clone();
264 config.doc = Some(tr.doc());
265 let mut new_instance = State::new(Arc::new(config))?;
266 let mut fields_instances = ImHashMap::new();
267 let sorted_plugins = self.sorted_plugins().await;
269
270 for plugin in sorted_plugins.iter() {
271 if let Some(field) = &plugin.spec.state_field {
272 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
273 let value = field
274 .apply_erased(tr, old_plugin_state, self, &new_instance)
275 .await;
276 fields_instances.insert(plugin.key.clone(), value);
277 }
278 }
279 }
280 new_instance.fields_instances = Arc::new(fields_instances);
281 Ok(Arc::new(new_instance))
282 }
283
284 #[must_use]
285 pub fn tr(&self) -> Transaction {
286 Transaction::new(self)
287 }
288
289 pub async fn reconfigure(
290 &self,
291 state_config: StateConfig,
292 ) -> StateResult<State> {
293 tracing::info!("正在重新配置状态");
294 let config = Configuration::new(
295 self.schema(),
296 state_config.plugins.clone(),
297 state_config.doc.clone(),
298 state_config.resource_manager.clone(),
299 )
300 .await?;
301 let mut instance = State::new(Arc::new(config))?;
302 let mut field_values = Vec::new();
303 let mut fields_instances = ImHashMap::new();
304 for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
305 {
306 if let Some(field) = &plugin.spec.state_field {
307 let key = plugin.key.clone();
308 tracing::debug!("正在重新配置插件: {}", key);
309 let value = if self.has_field(&key) {
310 if let Some(old_plugin_state) = self.get_field(&key) {
311 old_plugin_state
312 } else {
313 field.init_erased(&state_config, &instance).await
314 }
315 } else {
316 field.init_erased(&state_config, &instance).await
317 };
318 field_values.push((key, value));
319 }
320 }
321 for (name, value) in field_values {
322 fields_instances.insert(name, value);
323 }
324 instance.fields_instances = Arc::new(fields_instances);
325 tracing::info!("状态重新配置完成");
326 Ok(instance)
327 }
328
329 pub fn get_field(
330 &self,
331 name: &str,
332 ) -> Option<Arc<dyn Resource>> {
333 self.fields_instances.get(name).cloned()
334 }
335 pub fn get<T: Resource>(
336 &self,
337 name: &str,
338 ) -> Option<Arc<T>> {
339 self.fields_instances
340 .get(name)
341 .cloned()
342 .and_then(|state| state.downcast_arc::<T>().cloned())
343 }
344
345 pub fn has_field(
346 &self,
347 name: &str,
348 ) -> bool {
349 self.fields_instances.contains_key(name)
350 }
351 pub async fn serialize(&self) -> StateResult<StateSerialize> {
353 let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
354 for plugin in self.plugins().await {
355 if let Some(state_field) = &plugin.spec.state_field {
356 if let Some(value) = self.get_field(&plugin.key) {
357 if let Some(json) = state_field.serialize_erased(value) {
358 state_fields.insert(plugin.key.clone(), json);
359 }
360 };
361 }
362 }
363 let node_pool_str =
364 serde_json::to_string(&self.doc()).map_err(|e| {
365 error::serialize_error(format!("node pool 序列化失败: {e}"))
366 })?;
367 let state_fields_str =
368 serde_json::to_string(&state_fields).map_err(|e| {
369 error::serialize_error(format!("fields 序列化失败: {e}"))
370 })?;
371 Ok(StateSerialize {
372 state_fields: state_fields_str.as_bytes().to_vec(),
373 node_pool: node_pool_str.as_bytes().to_vec(),
374 })
375 }
376 pub async fn deserialize(
378 s: &StateSerialize,
379 configuration: &Configuration,
380 ) -> StateResult<State> {
381 let state_fields: HashMap<String, Vec<u8>> =
382 serde_json::from_slice(&s.state_fields).map_err(|e| {
383 error::deserialize_error(format!(
384 "state fields 反序列化失败{e}"
385 ))
386 })?;
387 let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
388 .map_err(|e| {
389 error::deserialize_error(format!("node pool 反序列化失败: {e}"))
390 })?;
391 let mut config = configuration.clone();
392 config.doc = Some(node_pool);
393 let mut state = State::new(Arc::new(config))?;
394
395 let mut map_instances = ImHashMap::new();
396 for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
397 if let Some(state_field) = &plugin.spec.state_field {
398 if let Some(value) = state_fields.get(&plugin.key) {
399 if let Some(p_state) = state_field.deserialize_erased(value)
400 {
401 let key = plugin.key.clone();
402 map_instances.insert(key, p_state);
403 }
404 }
405 }
406 }
407 state.fields_instances = Arc::new(map_instances);
408 Ok(state)
409 }
410}
411
412pub struct StateSerialize {
413 pub state_fields: Vec<u8>,
414 pub node_pool: Vec<u8>,
415}
416
417#[derive(Debug)]
423pub struct StateConfig {
424 pub schema: Option<Arc<Schema>>,
425 pub doc: Option<Arc<NodePool>>,
426 pub stored_marks: Option<Vec<Mark>>,
427 pub plugins: Option<Vec<Arc<Plugin>>>,
428 pub resource_manager: Option<Arc<GlobalResourceManager>>,
429}
430
431pub struct SeenState {
432 state: Arc<State>,
433 n: usize,
434}
435#[derive(Debug, Clone)]
436pub struct TransactionResult {
437 pub state: Arc<State>,
438 pub transactions: Vec<Arc<Transaction>>,
439}
440#[derive(Clone, Debug)]
446pub struct Configuration {
447 pub plugin_manager: PluginManager,
448 pub doc: Option<Arc<NodePool>>,
449 schema: Arc<Schema>,
450 pub resource_manager: Arc<GlobalResourceManager>,
451}
452
453impl Configuration {
454 pub async fn new(
455 schema: Arc<Schema>,
456 plugins: Option<Vec<Arc<Plugin>>>,
457 doc: Option<Arc<NodePool>>,
458 resource_manager: Option<Arc<GlobalResourceManager>>,
459 ) -> StateResult<Self> {
460 let plugin_manager = if let Some(plugin_list) = plugins {
462 use crate::plugin::PluginManagerBuilder;
463
464 let mut builder = PluginManagerBuilder::new();
465 for plugin in plugin_list {
466 builder.register_plugin(plugin)?;
467 }
468 builder.build()?
469 } else {
470 PluginManager::new()
471 };
472
473 Ok(Configuration {
474 doc,
475 plugin_manager,
476 schema,
477 resource_manager: resource_manager
478 .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
479 })
480 }
481}