1use mf_model::{
2 id_generator::IdGenerator, mark::Mark, node_pool::NodePool, schema::Schema,
3};
4use std::fmt::{self, Debug};
5use std::{
6 collections::HashMap,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc,
10 },
11 time::Instant,
12};
13use mf_model::rpds::HashTrieMapSync;
14use crate::plugin::PluginManager;
15use crate::{ops::GlobalResourceManager, resource::Resource};
16
17use super::{
18 error::{error, StateResult},
19 plugin::{Plugin},
20 transaction::Transaction,
21};
22
23static VERSION: AtomicU64 = AtomicU64::new(1);
24pub fn get_state_version() -> u64 {
25 VERSION.fetch_add(1, Ordering::SeqCst)
27}
28#[derive(Clone)]
34pub struct State {
35 pub config: Arc<Configuration>,
36 pub fields_instances: Arc<HashTrieMapSync<String, Arc<dyn Resource>>>,
37 pub node_pool: Arc<NodePool>,
38 pub version: u64,
39}
40impl Debug for State {
41 fn fmt(
42 &self,
43 f: &mut fmt::Formatter<'_>,
44 ) -> fmt::Result {
45 write!(
46 f,
47 "State {{ 字段数量: {} }}",
48 self.fields_instances.keys().len()
49 )
50 }
51}
52
53impl State {
54 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(state_config), fields(
59 crate_name = "state",
60 has_schema = state_config.schema.is_some(),
61 has_doc = state_config.doc.is_some()
62 )))]
63 pub async fn create(state_config: StateConfig) -> StateResult<State> {
64 tracing::info!("正在创建新的state");
65 let schema: Arc<Schema> = match &state_config.schema {
66 Some(schema) => schema.clone(),
67 None => state_config.schema.clone().ok_or_else(|| {
68 error::schema_error("必须提供结构定义".to_string())
69 })?,
70 };
71 let config = Configuration::new(
72 schema,
73 state_config.plugins.clone(),
74 state_config.doc.clone(),
75 state_config.resource_manager.clone(),
76 )
77 .await?;
78 let mut instance = State::new(Arc::new(config))?;
79 let mut field_values = Vec::new();
80 let mut fields_instances = HashTrieMapSync::new_sync();
81 for plugin in instance.config.plugin_manager.get_sorted_plugins().await
82 {
83 if let Some(field) = &plugin.spec.state_field {
84 tracing::debug!("正在初始化插件状态: {}", plugin.key);
85 let value = field.init_erased(&state_config, &instance).await;
86 field_values.push((plugin.key.clone(), value));
87 }
88 }
89 for (name, value) in field_values {
90 fields_instances.insert_mut(name, value);
91 }
92 instance.fields_instances = Arc::new(fields_instances);
93 tracing::info!("state创建成功");
94 Ok(instance)
95 }
96 pub fn new(config: Arc<Configuration>) -> StateResult<Self> {
100 let doc: Arc<NodePool> = match &config.doc {
101 Some(doc) => doc.clone(),
102 None => {
103 let id = IdGenerator::get_id();
104 let factory = config.schema.factory();
105 let nodes = factory.create_top_node(
106 Some(id.clone()),
107 None,
108 vec![],
109 None,
110 )?;
111 NodePool::from(nodes)
112 },
113 };
114
115 Ok(State {
116 fields_instances: Arc::new(HashTrieMapSync::new_sync()),
117 config,
118 node_pool: doc,
119 version: get_state_version(),
120 })
121 }
122 pub fn doc(&self) -> Arc<NodePool> {
123 Arc::clone(&self.node_pool)
124 }
125 pub fn resource_manager(&self) -> Arc<GlobalResourceManager> {
127 Arc::clone(&self.config.resource_manager)
128 }
129 pub fn schema(&self) -> Arc<Schema> {
131 Arc::clone(&self.config.schema)
132 }
133 pub async fn plugins(&self) -> Vec<Arc<Plugin>> {
135 self.config.plugin_manager.get_sorted_plugins().await
136 }
137
138 pub async fn sorted_plugins(&self) -> Vec<Arc<Plugin>> {
141 self.config.plugin_manager.get_sorted_plugins().await
143 }
144
145 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
147 crate_name = "state",
148 tr_id = %transaction.id,
149 step_count = transaction.steps.len(),
150 version = self.version
151 )))]
152 pub async fn apply(
153 self: &Arc<Self>,
154 transaction: Transaction,
155 ) -> StateResult<TransactionResult> {
156 let start_time = Instant::now();
157 let initial_step_count = transaction.steps.len();
158 tracing::info!("开始应用事务,初始步骤数: {}", initial_step_count);
159 let result = self.apply_transaction(Arc::new(transaction)).await?;
161 let duration = start_time.elapsed();
163 tracing::debug!("事务应用成功,步骤数保持不变,耗时: {:?}", duration);
164 Ok(result)
165 }
166
167 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
168 crate_name = "state",
169 tr_id = %tr.id,
170 ignore_plugin = ?ignore
171 )))]
172 pub async fn filter_transaction(
173 self: &Arc<Self>,
174 tr: &Transaction,
175 ignore: Option<usize>,
176 ) -> StateResult<bool> {
177 let sorted_plugins = self.sorted_plugins().await;
179
180 for (i, plugin) in sorted_plugins.iter().enumerate() {
181 if Some(i) != ignore
182 && !plugin.apply_filter_transaction(tr, self).await
183 {
184 return Ok(false);
185 }
186 }
187 Ok(true)
188 }
189
190 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, root_tr), fields(
193 crate_name = "state",
194 tr_id = %root_tr.id,
195 step_count = root_tr.steps.len()
196 )))]
197 pub async fn apply_transaction(
198 self: &Arc<Self>,
199 root_tr: Arc<Transaction>,
200 ) -> StateResult<TransactionResult> {
201 tracing::info!("开始应用事务");
202 if !self.filter_transaction(&root_tr, None).await? {
203 tracing::debug!("事务被过滤,返回原始状态");
204 return Ok(TransactionResult {
205 state: self.clone(),
206 transactions: vec![root_tr],
207 });
208 }
209
210 let mut trs = Vec::new();
211 let mut new_state: Arc<State> = self.apply_inner(&root_tr).await?;
212 trs.push(root_tr.clone());
213 let mut seen: Option<Vec<SeenState>> = None;
214
215 let sorted_plugins = self.sorted_plugins().await;
217
218 loop {
219 let mut have_new = false;
220 for (i, plugin) in sorted_plugins.iter().enumerate() {
221 let n: usize = seen.as_ref().map(|s| s[i].n).unwrap_or(0);
222 let old_state =
223 seen.as_ref().map(|s| &s[i].state).unwrap_or(self);
224 if n < trs.len() {
225 if let Some(mut tr) = plugin
226 .apply_append_transaction(
227 &trs[n..],
228 old_state,
229 &new_state,
230 )
231 .await?
232 {
233 if new_state.filter_transaction(&tr, Some(i)).await? {
234 tr.set_meta("rootTr", root_tr.clone());
235 if seen.is_none() {
236 let mut s: Vec<SeenState> = Vec::new();
237 for j in 0..sorted_plugins.len() {
238 s.push(if j < i {
239 SeenState {
240 state: new_state.clone(),
241 n: trs.len(),
242 }
243 } else {
244 SeenState { state: self.clone(), n: 0 }
245 });
246 }
247 seen = Some(s);
248 }
249 tracing::debug!(
250 "插件 {} 添加了新事务",
251 plugin.spec.tr.metadata().name.clone()
252 );
253 new_state = new_state.apply_inner(&tr).await?;
254 trs.push(Arc::new(tr));
255 have_new = true;
256 }
257 }
258 }
259 if let Some(seen) = &mut seen {
260 seen[i] =
261 SeenState { state: new_state.clone(), n: trs.len() };
262 }
263 }
264
265 if !have_new {
266 tracing::info!("事务应用完成,共 {} 个步骤", trs.len());
267 return Ok(TransactionResult {
268 state: new_state,
269 transactions: trs,
270 });
271 }
272 }
273 }
274
275 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, tr), fields(
277 crate_name = "state",
278 tr_id = %tr.id,
279 step_count = tr.steps.len(),
280 current_version = self.version
281 )))]
282 pub async fn apply_inner(
283 self: &Arc<Self>,
284 tr: &Transaction,
285 ) -> StateResult<Arc<State>> {
286 let mut config = self.config.as_ref().clone();
287 config.doc = Some(tr.doc());
288 let mut new_instance = State::new(Arc::new(config))?;
289 let mut fields_instances = HashTrieMapSync::new_sync();
290 let sorted_plugins = self.sorted_plugins().await;
292
293 for plugin in sorted_plugins.iter() {
294 if let Some(field) = &plugin.spec.state_field {
295 if let Some(old_plugin_state) = self.get_field(&plugin.key) {
296 let value = field
297 .apply_erased(tr, old_plugin_state, self, &new_instance)
298 .await;
299 fields_instances.insert_mut(plugin.key.clone(), value);
300 }
301 }
302 }
303 new_instance.fields_instances = Arc::new(fields_instances);
304 Ok(Arc::new(new_instance))
305 }
306
307 #[must_use]
308 pub fn tr(&self) -> Transaction {
309 Transaction::new(self)
310 }
311
312 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state_config), fields(
313 crate_name = "state",
314 current_version = self.version,
315 has_plugins = state_config.plugins.is_some()
316 )))]
317 pub async fn reconfigure(
318 &self,
319 state_config: StateConfig,
320 ) -> StateResult<State> {
321 tracing::info!("正在重新配置状态");
322 let config = Configuration::new(
323 self.schema(),
324 state_config.plugins.clone(),
325 state_config.doc.clone(),
326 state_config.resource_manager.clone(),
327 )
328 .await?;
329 let mut instance = State::new(Arc::new(config))?;
330 let mut field_values = Vec::new();
331 let mut fields_instances = HashTrieMapSync::new_sync();
332 for plugin in &instance.config.plugin_manager.get_sorted_plugins().await
333 {
334 if let Some(field) = &plugin.spec.state_field {
335 let key = plugin.key.clone();
336 tracing::debug!("正在重新配置插件: {}", key);
337 let value = if self.has_field(&key) {
338 if let Some(old_plugin_state) = self.get_field(&key) {
339 old_plugin_state
340 } else {
341 field.init_erased(&state_config, &instance).await
342 }
343 } else {
344 field.init_erased(&state_config, &instance).await
345 };
346 field_values.push((key, value));
347 }
348 }
349 for (name, value) in field_values {
350 fields_instances.insert_mut(name, value);
351 }
352 instance.fields_instances = Arc::new(fields_instances);
353 tracing::info!("状态重新配置完成");
354 Ok(instance)
355 }
356
357 pub fn get_field(
358 &self,
359 name: &str,
360 ) -> Option<Arc<dyn Resource>> {
361 self.fields_instances.get(name).cloned()
362 }
363 pub fn get<T: Resource>(
364 &self,
365 name: &str,
366 ) -> Option<Arc<T>> {
367 self.fields_instances
368 .get(name)
369 .cloned()
370 .and_then(|state| state.downcast_arc::<T>().cloned())
371 }
372
373 pub fn has_field(
374 &self,
375 name: &str,
376 ) -> bool {
377 self.fields_instances.contains_key(name)
378 }
379 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
381 crate_name = "state",
382 version = self.version,
383 doc_size = self.node_pool.size()
384 )))]
385 pub async fn serialize(&self) -> StateResult<StateSerialize> {
386 let mut state_fields: HashMap<String, Vec<u8>> = HashMap::new();
387 for plugin in self.plugins().await {
388 if let Some(state_field) = &plugin.spec.state_field {
389 if let Some(value) = self.get_field(&plugin.key) {
390 if let Some(json) = state_field.serialize_erased(value) {
391 state_fields.insert(plugin.key.clone(), json);
392 }
393 };
394 }
395 }
396 let node_pool_str =
397 serde_json::to_string(&self.doc()).map_err(|e| {
398 error::serialize_error(format!("node pool 序列化失败: {e}"))
399 })?;
400 Ok(StateSerialize {
401 state_fields: state_fields,
402 node_pool: node_pool_str.as_bytes().to_vec(),
403 })
404 }
405 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(s, configuration), fields(
407 crate_name = "state",
408 state_fields_size = s.state_fields.len(),
409 node_pool_size = s.node_pool.len()
410 )))]
411 pub async fn deserialize(
412 s: &StateSerialize,
413 configuration: &Configuration,
414 ) -> StateResult<State> {
415 let node_pool: Arc<NodePool> = serde_json::from_slice(&s.node_pool)
416 .map_err(|e| {
417 error::deserialize_error(format!("node pool 反序列化失败: {e}"))
418 })?;
419 let mut config = configuration.clone();
420 config.doc = Some(node_pool);
421 let mut state = State::new(Arc::new(config))?;
422
423 let mut map_instances = HashTrieMapSync::new_sync();
424 for plugin in &configuration.plugin_manager.get_sorted_plugins().await {
425 if let Some(state_field) = &plugin.spec.state_field {
426 if let Some(value) = s.state_fields.get(&plugin.key) {
427 if let Some(p_state) = state_field.deserialize_erased(value)
428 {
429 let key = plugin.key.clone();
430 map_instances.insert_mut(key, p_state);
431 }
432 }
433 }
434 }
435 state.fields_instances = Arc::new(map_instances);
436 Ok(state)
437 }
438}
439
440pub struct StateSerialize {
441 pub state_fields: HashMap<String, Vec<u8>>,
442 pub node_pool: Vec<u8>,
443}
444
445#[derive(Debug)]
451pub struct StateConfig {
452 pub schema: Option<Arc<Schema>>,
453 pub doc: Option<Arc<NodePool>>,
454 pub stored_marks: Option<Vec<Mark>>,
455 pub plugins: Option<Vec<Arc<Plugin>>>,
456 pub resource_manager: Option<Arc<GlobalResourceManager>>,
457}
458
459pub struct SeenState {
460 state: Arc<State>,
461 n: usize,
462}
463#[derive(Debug, Clone)]
464pub struct TransactionResult {
465 pub state: Arc<State>,
466 pub transactions: Vec<Arc<Transaction>>,
467}
468#[derive(Clone, Debug)]
474pub struct Configuration {
475 pub plugin_manager: PluginManager,
476 pub doc: Option<Arc<NodePool>>,
477 schema: Arc<Schema>,
478 pub resource_manager: Arc<GlobalResourceManager>,
479}
480
481impl Configuration {
482 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(schema, plugins, doc, resource_manager), fields(
483 crate_name = "state",
484 plugin_count = plugins.as_ref().map(|p| p.len()).unwrap_or(0),
485 has_doc = doc.is_some()
486 )))]
487 pub async fn new(
488 schema: Arc<Schema>,
489 plugins: Option<Vec<Arc<Plugin>>>,
490 doc: Option<Arc<NodePool>>,
491 resource_manager: Option<Arc<GlobalResourceManager>>,
492 ) -> StateResult<Self> {
493 let plugin_manager = if let Some(plugin_list) = plugins {
495 use crate::plugin::PluginManagerBuilder;
496
497 let mut builder = PluginManagerBuilder::new();
498 for plugin in plugin_list {
499 builder.register_plugin(plugin)?;
500 }
501 builder.build()?
502 } else {
503 PluginManager::new()
504 };
505
506 Ok(Configuration {
507 doc,
508 plugin_manager,
509 schema,
510 resource_manager: resource_manager
511 .unwrap_or_else(|| Arc::new(GlobalResourceManager::default())),
512 })
513 }
514}