1use crate::error::PoolResult;
2use crate::{node_definition::NodeTree, tree::Tree};
3
4use super::{error::error_helpers, node::Node, types::NodeId};
5use serde::{Deserialize, Serialize};
6use std::time::Instant;
7use std::{sync::Arc};
8use rayon::prelude::*;
9use std::marker::Sync;
10use std::collections::{HashMap, HashSet};
11use lru::LruCache;
12use std::num::NonZeroUsize;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15static POOL_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
17
18type NodeCondition = Box<dyn Fn(&Node) -> bool + Send + Sync>;
20type NodeConditionRef<'a> = Box<dyn Fn(&Node) -> bool + Send + Sync + 'a>;
21
22#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
26pub struct NodePool {
27 inner: Arc<Tree>,
29 key: String,
31}
32
33impl NodePool {
34 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(inner), fields(
35 crate_name = "model",
36 node_count = inner.nodes.iter().map(|i| i.values().len()).sum::<usize>()
37 )))]
38 pub fn new(inner: Arc<Tree>) -> Arc<NodePool> {
39 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
40 let pool = Self { inner, key: format!("pool_{id}") };
41 let pool: Arc<NodePool> = Arc::new(pool);
42
43 pool
44 }
45
46 pub fn key(&self) -> &str {
48 &self.key
49 }
50
51 pub fn size(&self) -> usize {
53 self.inner.nodes.iter().map(|i| i.values().len()).sum()
54 }
55
56 pub fn root(&self) -> Arc<Node> {
57 self.inner[&self.inner.root_id].clone()
58 }
59
60 pub fn root_id(&self) -> &NodeId {
61 &self.inner.root_id
62 }
63
64 pub fn get_inner(&self) -> &Arc<Tree> {
65 &self.inner
66 }
67
68 pub fn from(nodes: NodeTree) -> Arc<NodePool> {
79 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
80 let pool = Self {
81 inner: Arc::new(Tree::from(nodes)),
82 key: format!("pool_{id}"),
83 };
84 let pool: Arc<NodePool> = Arc::new(pool);
85 pool
86 }
87
88 pub fn get_node(
92 &self,
93 id: &NodeId,
94 ) -> Option<Arc<Node>> {
95 self.inner.get_node(id)
96 }
97 pub fn get_parent_node(
98 &self,
99 id: &NodeId,
100 ) -> Option<Arc<Node>> {
101 self.inner.get_parent_node(id)
102 }
103
104 pub fn contains_node(
106 &self,
107 id: &NodeId,
108 ) -> bool {
109 self.inner.contains_node(id)
110 }
111
112 pub fn children(
116 &self,
117 parent_id: &NodeId,
118 ) -> Option<imbl::Vector<NodeId>> {
119 self.get_node(parent_id).map(|n| n.content.clone())
120 }
121
122 pub fn descendants(
124 &self,
125 parent_id: &NodeId,
126 ) -> Vec<Arc<Node>> {
127 let mut result: Vec<Arc<Node>> = Vec::new();
128 self._collect_descendants(parent_id, &mut result);
129 result
130 }
131
132 fn _collect_descendants(
133 &self,
134 parent_id: &NodeId,
135 result: &mut Vec<Arc<Node>>,
136 ) {
137 if let Some(children) = self.children(parent_id) {
138 for child_id in &children {
139 if let Some(child) = self.get_node(child_id) {
140 result.push(child);
141 self._collect_descendants(child_id, result);
142 }
143 }
144 }
145 }
146 pub fn for_each<F>(
147 &self,
148 id: &NodeId,
149 f: F,
150 ) where
151 F: Fn(&Arc<Node>),
152 {
153 if let Some(children) = self.children(id) {
154 for child_id in &children {
155 if let Some(child) = self.get_node(child_id) {
156 f(&child);
157 }
158 }
159 }
160 }
161 pub fn parent_id(
163 &self,
164 child_id: &NodeId,
165 ) -> Option<&NodeId> {
166 self.inner.parent_map.get(child_id)
167 }
168
169 pub fn ancestors(
171 &self,
172 child_id: &NodeId,
173 ) -> Vec<Arc<Node>> {
174 let mut chain = Vec::new();
175 let mut current_id = child_id;
176 while let Some(parent_id) = self.parent_id(current_id) {
177 if let Some(parent) = self.get_node(parent_id) {
178 chain.push(parent);
179 current_id = parent_id;
180 } else {
181 break;
182 }
183 }
184 chain
185 }
186
187 pub fn validate_hierarchy(&self) -> PoolResult<()> {
189 for (child_id, parent_id) in &self.inner.parent_map {
190 if !self.contains_node(parent_id) {
192 return Err(error_helpers::orphan_node(child_id.clone()));
193 }
194
195 if let Some(children) = self.children(parent_id) {
197 if !children.contains(child_id) {
198 return Err(error_helpers::invalid_parenting(
199 child_id.clone(),
200 parent_id.clone(),
201 ));
202 }
203 }
204 }
205 Ok(())
206 }
207
208 pub fn filter_nodes<P>(
211 &self,
212 predicate: P,
213 ) -> Vec<Arc<Node>>
214 where
215 P: Fn(&Node) -> bool,
216 {
217 self.get_all_nodes().into_iter().filter(|n| predicate(n)).collect()
218 }
219 pub fn find_node<P>(
221 &self,
222 predicate: P,
223 ) -> Option<Arc<Node>>
224 where
225 P: Fn(&Node) -> bool,
226 {
227 self.get_all_nodes().into_iter().find(|n| predicate(n))
228 }
229
230 pub fn get_node_depth(
240 &self,
241 node_id: &NodeId,
242 ) -> Option<usize> {
243 let mut depth = 0;
244 let mut current_id = node_id;
245
246 while let Some(parent_id) = self.parent_id(current_id) {
247 depth += 1;
248 current_id = parent_id;
249 }
250
251 Some(depth)
252 }
253
254 pub fn get_node_path(
264 &self,
265 node_id: &NodeId,
266 ) -> Vec<NodeId> {
267 let mut path = Vec::new();
268 let mut current_id = node_id;
269
270 while let Some(parent_id) = self.parent_id(current_id) {
271 path.push(current_id.clone());
272 current_id = parent_id;
273 }
274 path.push(current_id.clone());
275 path.reverse();
276
277 path
278 }
279 pub fn resolve(
281 &self,
282 node_id: &NodeId,
283 ) -> Vec<Arc<Node>> {
284 let mut result = Vec::new();
285 let mut current_id = node_id;
286
287 loop {
289 if let Some(node) = self.get_node(current_id) {
290 result.push(node);
291 }
292
293 if let Some(parent_id) = self.parent_id(current_id) {
294 current_id = parent_id;
295 } else {
296 break;
298 }
299 }
300
301 result.reverse();
303 result
304 }
305
306 pub fn is_leaf(
316 &self,
317 node_id: &NodeId,
318 ) -> bool {
319 if let Some(children) = self.children(node_id) {
320 children.is_empty()
321 } else {
322 true
323 }
324 }
325
326 pub fn get_left_siblings(
328 &self,
329 node_id: &NodeId,
330 ) -> Vec<NodeId> {
331 if let Some(parent_id) = self.parent_id(node_id) {
332 if let Some(siblings) = self.children(parent_id) {
333 if let Some(index) =
334 siblings.iter().position(|id| id == node_id)
335 {
336 return siblings.iter().take(index).cloned().collect();
337 } else {
338 eprintln!(
340 "Warning: Node {node_id:?} not found in parent's children list"
341 );
342 }
343 }
344 }
345 Vec::new()
346 }
347 pub fn get_right_siblings(
349 &self,
350 node_id: &NodeId,
351 ) -> Vec<NodeId> {
352 if let Some(parent_id) = self.parent_id(node_id) {
353 if let Some(siblings) = self.children(parent_id) {
354 if let Some(index) =
355 siblings.iter().position(|id| id == node_id)
356 {
357 return siblings.iter().skip(index + 1).cloned().collect();
358 } else {
359 eprintln!(
361 "Warning: Node {node_id:?} not found in parent's children list"
362 );
363 }
364 }
365 }
366 Vec::new()
367 }
368 pub fn get_left_nodes(
370 &self,
371 node_id: &NodeId,
372 ) -> Vec<Arc<Node>> {
373 let siblings = self.get_left_siblings(node_id);
374 let mut result = Vec::new();
375 for sibling_id in siblings {
376 if let Some(node) = self.get_node(&sibling_id) {
377 result.push(node);
378 }
379 }
380 result
381 }
382
383 pub fn get_right_nodes(
385 &self,
386 node_id: &NodeId,
387 ) -> Vec<Arc<Node>> {
388 let siblings = self.get_right_siblings(node_id);
389 let mut result = Vec::new();
390 for sibling_id in siblings {
391 if let Some(node) = self.get_node(&sibling_id) {
392 result.push(node);
393 }
394 }
395 result
396 }
397
398 pub fn get_all_siblings(
408 &self,
409 node_id: &NodeId,
410 ) -> Vec<NodeId> {
411 if let Some(parent_id) = self.parent_id(node_id) {
412 if let Some(children) = self.children(parent_id) {
413 return children.iter().cloned().collect();
414 }
415 }
416 Vec::new()
417 }
418
419 pub fn get_subtree_size(
429 &self,
430 node_id: &NodeId,
431 ) -> usize {
432 let mut size = 1; if let Some(children) = self.children(node_id) {
434 for child_id in &children {
435 size += self.get_subtree_size(child_id);
436 }
437 }
438 size
439 }
440
441 pub fn is_ancestor(
452 &self,
453 ancestor_id: &NodeId,
454 descendant_id: &NodeId,
455 ) -> bool {
456 let mut current_id = descendant_id;
457 while let Some(parent_id) = self.parent_id(current_id) {
458 if parent_id == ancestor_id {
459 return true;
460 }
461 current_id = parent_id;
462 }
463 false
464 }
465
466 pub fn get_lowest_common_ancestor(
477 &self,
478 node1_id: &NodeId,
479 node2_id: &NodeId,
480 ) -> Option<NodeId> {
481 let path1 = self.get_node_path(node1_id);
482 let path2 = self.get_node_path(node2_id);
483
484 for ancestor_id in path1.iter().rev() {
485 if path2.contains(ancestor_id) {
486 return Some(ancestor_id.clone());
487 }
488 }
489 None
490 }
491
492 pub fn parallel_query<P>(
502 &self,
503 predicate: P,
504 ) -> Vec<Arc<Node>>
505 where
506 P: Fn(&Node) -> bool + Send + Sync,
507 {
508 let shards: Vec<_> = self.inner.nodes.iter().collect();
510
511 shards
513 .into_par_iter()
514 .flat_map(|shard| {
515 shard
517 .values()
518 .filter(|node| predicate(node))
519 .cloned()
520 .collect::<Vec<_>>()
521 })
522 .collect()
523 }
524
525 pub fn parallel_batch_query<P>(
536 &self,
537 batch_size: usize,
538 predicate: P,
539 ) -> Vec<Arc<Node>>
540 where
541 P: Fn(&[Arc<Node>]) -> Vec<Arc<Node>> + Send + Sync,
542 {
543 let shards: Vec<_> = self.inner.nodes.iter().collect();
545
546 shards
548 .into_par_iter()
549 .flat_map(|shard| {
550 let nodes: Vec<_> = shard.values().cloned().collect();
552
553 nodes
555 .chunks(batch_size)
556 .flat_map(&predicate)
557 .collect::<Vec<_>>()
558 })
559 .collect()
560 }
561
562 pub fn parallel_query_map<P, T, F>(
573 &self,
574 predicate: P,
575 transform: F,
576 ) -> Vec<T>
577 where
578 P: Fn(&Node) -> bool + Send + Sync,
579 F: Fn(&Arc<Node>) -> T + Send + Sync,
580 T: Send,
581 {
582 let shards: Vec<_> = self.inner.nodes.iter().collect();
584
585 shards
587 .into_par_iter()
588 .flat_map(|shard| {
589 shard
591 .values()
592 .filter(|node| predicate(node))
593 .map(&transform)
594 .collect::<Vec<T>>()
595 })
596 .collect()
597 }
598
599 pub fn parallel_query_reduce<P, T, F>(
611 &self,
612 predicate: P,
613 init: T,
614 fold: F,
615 ) -> T
616 where
617 P: Fn(&Node) -> bool + Send + Sync,
618 F: Fn(T, &Arc<Node>) -> T + Send + Sync,
619 T: Send + Sync + Clone,
620 {
621 let dummy_node = Arc::new(Node::new(
622 "",
623 "".to_string(),
624 Default::default(),
625 vec![],
626 vec![],
627 ));
628
629 let shards: Vec<_> = self.inner.nodes.iter().collect();
631
632 shards
634 .into_par_iter()
635 .map(|shard| {
636 shard
638 .values()
639 .filter(|node| predicate(node))
640 .fold(init.clone(), &fold)
641 })
642 .reduce(|| init.clone(), |a, _b| fold(a, &dummy_node))
644 }
645
646 fn get_all_nodes(&self) -> Vec<Arc<Node>> {
648 let mut result = Vec::new();
649 for shard in &self.inner.nodes {
650 for node in shard.values() {
651 result.push(node.clone());
652 }
653 }
654 result
655 }
656}
657
658pub struct QueryEngine<'a> {
660 pool: &'a NodePool,
661 conditions: Vec<NodeConditionRef<'a>>,
662}
663
664impl<'a> QueryEngine<'a> {
665 pub fn new(pool: &'a NodePool) -> Self {
667 Self { pool, conditions: Vec::new() }
668 }
669
670 pub fn by_type(
672 mut self,
673 node_type: &'a str,
674 ) -> Self {
675 let node_type = node_type.to_string();
676 self.conditions.push(Box::new(move |node| node.r#type == node_type));
677 self
678 }
679
680 pub fn by_attr(
682 mut self,
683 key: &'a str,
684 value: &'a serde_json::Value,
685 ) -> Self {
686 let key = key.to_string();
687 let value = value.clone();
688 self.conditions
689 .push(Box::new(move |node| node.attrs.get(&key) == Some(&value)));
690 self
691 }
692
693 pub fn by_mark(
695 mut self,
696 mark_type: &'a str,
697 ) -> Self {
698 let mark_type = mark_type.to_string();
699 self.conditions.push(Box::new(move |node| {
700 node.marks.iter().any(|mark| mark.r#type == mark_type)
701 }));
702 self
703 }
704
705 pub fn by_child_count(
707 mut self,
708 count: usize,
709 ) -> Self {
710 self.conditions.push(Box::new(move |node| node.content.len() == count));
711 self
712 }
713
714 pub fn by_depth(
716 mut self,
717 depth: usize,
718 ) -> Self {
719 let pool = self.pool.clone();
720 self.conditions.push(Box::new(move |node| {
721 pool.get_node_depth(&node.id) == Some(depth)
722 }));
723 self
724 }
725
726 pub fn by_ancestor_type(
728 mut self,
729 ancestor_type: &'a str,
730 ) -> Self {
731 let pool = self.pool.clone();
732 let ancestor_type = ancestor_type.to_string();
733 self.conditions.push(Box::new(move |node| {
734 pool.ancestors(&node.id)
735 .iter()
736 .any(|ancestor| ancestor.r#type == ancestor_type)
737 }));
738 self
739 }
740
741 pub fn by_descendant_type(
743 mut self,
744 descendant_type: &'a str,
745 ) -> Self {
746 let pool = self.pool.clone();
747 let descendant_type = descendant_type.to_string();
748 self.conditions.push(Box::new(move |node| {
749 pool.descendants(&node.id)
750 .iter()
751 .any(|descendant| descendant.r#type == descendant_type)
752 }));
753 self
754 }
755
756 pub fn find_all(&self) -> Vec<Arc<Node>> {
758 self.pool
759 .get_all_nodes()
760 .into_iter()
761 .filter(|node| {
762 self.conditions.iter().all(|condition| condition(node))
763 })
764 .collect()
765 }
766
767 pub fn find_first(&self) -> Option<Arc<Node>> {
769 self.pool.get_all_nodes().into_iter().find(|node| {
770 self.conditions.iter().all(|condition| condition(node))
771 })
772 }
773
774 pub fn count(&self) -> usize {
776 self.pool
777 .get_all_nodes()
778 .into_iter()
779 .filter(|node| {
780 self.conditions.iter().all(|condition| condition(node))
781 })
782 .count()
783 }
784
785 pub fn parallel_find_all(&self) -> Vec<Arc<Node>> {
787 let conditions = &self.conditions;
788 self.pool.parallel_query(|node| {
789 conditions.iter().all(|condition| condition(node))
790 })
791 }
792
793 pub fn parallel_find_first(&self) -> Option<Arc<Node>> {
795 let conditions = &self.conditions;
796 self.pool.get_all_nodes().into_par_iter().find_any(move |node| {
797 conditions.iter().all(|condition| condition(node))
798 })
799 }
800
801 pub fn parallel_count(&self) -> usize {
803 let conditions = &self.conditions;
804 self.pool
805 .get_all_nodes()
806 .into_par_iter()
807 .filter(move |node| {
808 conditions.iter().all(|condition| condition(node))
809 })
810 .count()
811 }
812}
813
814impl NodePool {
815 pub fn query(&self) -> QueryEngine<'_> {
817 QueryEngine::new(self)
818 }
819}
820
821#[derive(Clone, Debug)]
823pub struct QueryCacheConfig {
824 pub capacity: usize,
826 pub enabled: bool,
828}
829
830impl Default for QueryCacheConfig {
831 fn default() -> Self {
832 Self { capacity: 1000, enabled: true }
833 }
834}
835
836pub struct OptimizedQueryEngine {
838 pool: Arc<NodePool>,
839 cache: Option<LruCache<String, Vec<Arc<Node>>>>,
840 type_index: HashMap<String, Vec<Arc<Node>>>,
841 depth_index: HashMap<usize, Vec<Arc<Node>>>,
842 mark_index: HashMap<String, Vec<Arc<Node>>>,
843}
844
845impl OptimizedQueryEngine {
846 pub fn new(
847 pool: &NodePool,
848 config: QueryCacheConfig,
849 ) -> PoolResult<Self> {
850 let mut engine = Self {
851 pool: Arc::new(pool.clone()),
852 cache: if config.enabled {
853 Some(LruCache::new(
854 NonZeroUsize::new(config.capacity).ok_or_else(|| {
855 anyhow::anyhow!("query cache capacity must be > 0")
856 })?,
857 ))
858 } else {
859 None
860 },
861 type_index: HashMap::new(),
862 depth_index: HashMap::new(),
863 mark_index: HashMap::new(),
864 };
865 let start = Instant::now();
866 engine.build_indices()?;
867 let duration = start.elapsed();
868 println!("索引构建完成,耗时: {duration:?}");
869 Ok(engine)
870 }
871
872 fn build_indices(&mut self) -> PoolResult<()> {
874 use rayon::prelude::*;
875 use std::collections::HashMap;
876 use std::sync::Mutex;
877
878 use std::sync::Arc;
879 let node_count = self.pool.size();
881
882 let type_index =
884 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 5)));
885 let depth_index = Arc::new(Mutex::new(HashMap::with_capacity(10)));
886 let mark_index =
887 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 10)));
888
889 let optimal_shard_size = 1000; let mut all_nodes: Vec<_> = self
894 .pool
895 .inner
896 .nodes
897 .iter()
898 .flat_map(|shard| shard.values().cloned())
899 .collect();
900
901 all_nodes.sort_by(|a, b| a.id.cmp(&b.id));
903
904 let shards: Vec<_> = all_nodes.chunks(optimal_shard_size).collect();
906
907 shards.into_par_iter().for_each(|shard| {
909 let mut local_type_index = HashMap::with_capacity(shard.len() / 5);
911 let mut local_depth_index = HashMap::with_capacity(5);
912 let mut local_mark_index = HashMap::with_capacity(shard.len() / 10);
913
914 let mut type_nodes = Vec::with_capacity(shard.len());
916 let mut depth_nodes = Vec::with_capacity(shard.len());
917 let mut mark_nodes = Vec::with_capacity(shard.len() * 2);
918
919 for node in shard {
921 type_nodes.push((node.r#type.clone(), Arc::clone(node)));
923
924 if let Some(depth) = self.pool.get_node_depth(&node.id) {
926 depth_nodes.push((depth, Arc::clone(node)));
927 }
928
929 for mark in &node.marks {
931 mark_nodes.push((mark.r#type.clone(), Arc::clone(node)));
932 }
933 }
934
935 for (type_name, node) in type_nodes {
937 local_type_index
938 .entry(type_name)
939 .or_insert_with(|| Vec::with_capacity(shard.len() / 5))
940 .push(node);
941 }
942
943 for (depth, node) in depth_nodes {
944 local_depth_index
945 .entry(depth)
946 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
947 .push(node);
948 }
949
950 for (mark_type, node) in mark_nodes {
951 local_mark_index
952 .entry(mark_type)
953 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
954 .push(node);
955 }
956
957 {
959 if let Ok(mut type_idx) = type_index.lock() {
960 for (k, v) in local_type_index {
961 type_idx
962 .entry(k)
963 .or_insert_with(|| Vec::with_capacity(v.len()))
964 .extend(v);
965 }
966 } else {
967 return;
968 }
969 }
970 {
971 if let Ok(mut depth_idx) = depth_index.lock() {
972 for (k, v) in local_depth_index {
973 depth_idx
974 .entry(k)
975 .or_insert_with(|| Vec::with_capacity(v.len()))
976 .extend(v);
977 }
978 } else {
979 return;
980 }
981 }
982 {
983 if let Ok(mut mark_idx) = mark_index.lock() {
984 for (k, v) in local_mark_index {
985 mark_idx
986 .entry(k)
987 .or_insert_with(|| Vec::with_capacity(v.len()))
988 .extend(v);
989 }
990 }
991 }
992 });
993
994 self.type_index = Arc::try_unwrap(type_index)
996 .map_err(|_| anyhow::anyhow!("type_index still has refs"))?
997 .into_inner()
998 .map_err(|_| anyhow::anyhow!("type_index poisoned"))?;
999 self.depth_index = Arc::try_unwrap(depth_index)
1000 .map_err(|_| anyhow::anyhow!("depth_index still has refs"))?
1001 .into_inner()
1002 .map_err(|_| anyhow::anyhow!("depth_index poisoned"))?;
1003 self.mark_index = Arc::try_unwrap(mark_index)
1004 .map_err(|_| anyhow::anyhow!("mark_index still has refs"))?
1005 .into_inner()
1006 .map_err(|_| anyhow::anyhow!("mark_index poisoned"))?;
1007 Ok(())
1008 }
1009
1010 pub fn by_type(
1012 &self,
1013 node_type: &str,
1014 ) -> Vec<Arc<Node>> {
1015 self.type_index.get(node_type).cloned().unwrap_or_default()
1016 }
1017
1018 pub fn by_depth(
1020 &self,
1021 depth: usize,
1022 ) -> Vec<Arc<Node>> {
1023 self.depth_index.get(&depth).cloned().unwrap_or_default()
1024 }
1025
1026 pub fn by_mark(
1028 &self,
1029 mark_type: &str,
1030 ) -> Vec<Arc<Node>> {
1031 self.mark_index.get(mark_type).cloned().unwrap_or_default()
1032 }
1033
1034 pub fn query(
1036 &mut self,
1037 conditions: Vec<NodeCondition>,
1038 ) -> Vec<Arc<Node>> {
1039 let cache_key = self.generate_query_cache_key(&conditions);
1041
1042 if let Some(cache) = &self.cache {
1044 if let Some(cached) = cache.peek(&cache_key) {
1045 return cached.clone();
1046 }
1047 }
1048
1049 let mut candidates: Option<Vec<Arc<Node>>> = None;
1051
1052 for condition in &conditions {
1054 if let Some(indexed) = self.get_indexed_nodes(condition) {
1055 candidates = match candidates {
1056 None => Some(indexed),
1057 Some(existing) => {
1058 Some(self.intersect_nodes(&existing, &indexed))
1059 },
1060 };
1061 }
1062 }
1063
1064 let result: Vec<Arc<Node>> = match candidates {
1065 Some(nodes) => {
1066 nodes
1068 .par_iter()
1069 .filter(|node| {
1070 conditions.iter().all(|condition| condition(node))
1071 })
1072 .cloned()
1073 .collect()
1074 },
1075 None => {
1076 self.pool
1078 .parallel_query(|node| {
1079 conditions.iter().all(|condition| condition(node))
1080 })
1081 .into_iter()
1082 .collect()
1083 },
1084 };
1085
1086 if let Some(cache) = &mut self.cache {
1088 cache.put(cache_key, result.clone());
1089 }
1090
1091 result
1092 }
1093
1094 fn generate_query_cache_key(
1096 &self,
1097 conditions: &[NodeCondition],
1098 ) -> String {
1099 use std::collections::hash_map::DefaultHasher;
1100 use std::hash::{Hash, Hasher};
1101
1102 let mut hasher = DefaultHasher::new();
1103
1104 conditions.len().hash(&mut hasher);
1106 self.pool.key().hash(&mut hasher);
1107
1108 for (i, _condition) in conditions.iter().enumerate() {
1110 i.hash(&mut hasher);
1112 std::ptr::addr_of!(_condition).hash(&mut hasher);
1113 }
1114
1115 format!("query_{:x}", hasher.finish())
1116 }
1117
1118 fn get_indexed_nodes(
1120 &self,
1121 condition: &(dyn Fn(&Node) -> bool + Send + Sync),
1122 ) -> Option<Vec<Arc<Node>>> {
1123 if let Some(type_nodes) = self.type_index.get("document") {
1125 if condition(&type_nodes[0]) {
1126 return Some(type_nodes.clone());
1127 }
1128 }
1129
1130 if let Some(depth_nodes) = self.depth_index.get(&0) {
1132 if condition(&depth_nodes[0]) {
1133 return Some(depth_nodes.clone());
1134 }
1135 }
1136
1137 for mark_nodes in self.mark_index.values() {
1139 if !mark_nodes.is_empty() && condition(&mark_nodes[0]) {
1140 return Some(mark_nodes.clone());
1141 }
1142 }
1143
1144 None
1145 }
1146
1147 fn intersect_nodes(
1149 &self,
1150 nodes1: &[Arc<Node>],
1151 nodes2: &[Arc<Node>],
1152 ) -> Vec<Arc<Node>> {
1153 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
1154 nodes2
1155 .iter()
1156 .filter(|node| set1.contains(node.id.as_ref()))
1157 .cloned()
1158 .collect()
1159 }
1160}
1161
1162impl NodePool {
1163 pub fn optimized_query(
1165 &self,
1166 config: QueryCacheConfig,
1167 ) -> PoolResult<OptimizedQueryEngine> {
1168 OptimizedQueryEngine::new(self, config)
1169 }
1170}
1171
1172impl Clone for OptimizedQueryEngine {
1174 fn clone(&self) -> Self {
1175 Self {
1176 pool: self.pool.clone(),
1177 cache: self.cache.clone(),
1178 type_index: self.type_index.clone(),
1179 depth_index: self.depth_index.clone(),
1180 mark_index: self.mark_index.clone(),
1181 }
1182 }
1183}
1184
1185#[derive(Clone, Debug)]
1187pub struct LazyQueryConfig {
1188 pub cache_capacity: usize,
1190 pub index_cache_capacity: usize,
1192 pub cache_enabled: bool,
1194 pub index_build_threshold: usize,
1196}
1197
1198impl Default for LazyQueryConfig {
1199 fn default() -> Self {
1200 Self {
1201 cache_capacity: 1000,
1202 index_cache_capacity: 100,
1203 cache_enabled: true,
1204 index_build_threshold: 5,
1205 }
1206 }
1207}
1208
1209#[derive(Debug, Clone)]
1211pub struct QueryStats {
1212 count: usize,
1214 last_query: Instant,
1216}
1217
1218pub struct LazyQueryEngine {
1220 pool: Arc<NodePool>,
1221
1222 query_cache: Option<LruCache<String, Vec<Arc<Node>>>>,
1224
1225 type_index_cache: LruCache<String, Vec<Arc<Node>>>,
1227 depth_index_cache: LruCache<usize, Vec<Arc<Node>>>,
1228 mark_index_cache: LruCache<String, Vec<Arc<Node>>>,
1229
1230 type_query_stats: HashMap<String, QueryStats>,
1232 depth_query_stats: HashMap<usize, QueryStats>,
1233 mark_query_stats: HashMap<String, QueryStats>,
1234
1235 config: LazyQueryConfig,
1237}
1238
1239impl LazyQueryEngine {
1242 pub fn new(
1243 pool: &NodePool,
1244 config: LazyQueryConfig,
1245 ) -> Self {
1246 Self {
1247 pool: Arc::new(pool.clone()),
1248 query_cache: if config.cache_enabled {
1249 Some(LruCache::new(
1250 NonZeroUsize::new(config.cache_capacity)
1251 .expect("cache_capacity > 0"),
1252 ))
1253 } else {
1254 None
1255 },
1256 type_index_cache: LruCache::new(
1257 NonZeroUsize::new(config.index_cache_capacity)
1258 .expect("index_cache_capacity > 0"),
1259 ),
1260 depth_index_cache: LruCache::new(
1261 NonZeroUsize::new(config.index_cache_capacity)
1262 .expect("index_cache_capacity > 0"),
1263 ),
1264 mark_index_cache: LruCache::new(
1265 NonZeroUsize::new(config.index_cache_capacity)
1266 .expect("index_cache_capacity > 0"),
1267 ),
1268 type_query_stats: HashMap::new(),
1269 depth_query_stats: HashMap::new(),
1270 mark_query_stats: HashMap::new(),
1271 config,
1272 }
1273 }
1274
1275 pub fn by_type_lazy(
1277 &mut self,
1278 node_type: &str,
1279 ) -> Vec<Arc<Node>> {
1280 self.update_type_stats(node_type);
1282
1283 if let Some(cached) = self.type_index_cache.get(node_type) {
1285 return cached.clone();
1286 }
1287
1288 let start = Instant::now();
1290 let nodes = self.build_type_index(node_type);
1291 let duration = start.elapsed();
1292
1293 println!(
1294 "实时构建类型索引 '{}', 耗时: {:?}, 节点数: {}",
1295 node_type,
1296 duration,
1297 nodes.len()
1298 );
1299
1300 self.type_index_cache.put(node_type.to_string(), nodes.clone());
1302
1303 nodes
1304 }
1305
1306 pub fn by_depth_lazy(
1308 &mut self,
1309 depth: usize,
1310 ) -> Vec<Arc<Node>> {
1311 self.update_depth_stats(depth);
1312
1313 if let Some(cached) = self.depth_index_cache.get(&depth) {
1314 return cached.clone();
1315 }
1316
1317 let start = Instant::now();
1318 let nodes = self.build_depth_index(depth);
1319 let duration = start.elapsed();
1320
1321 println!(
1322 "实时构建深度索引 {}, 耗时: {:?}, 节点数: {}",
1323 depth,
1324 duration,
1325 nodes.len()
1326 );
1327
1328 self.depth_index_cache.put(depth, nodes.clone());
1329 nodes
1330 }
1331
1332 pub fn by_mark_lazy(
1334 &mut self,
1335 mark_type: &str,
1336 ) -> Vec<Arc<Node>> {
1337 self.update_mark_stats(mark_type);
1338
1339 if let Some(cached) = self.mark_index_cache.get(mark_type) {
1340 return cached.clone();
1341 }
1342
1343 let start = Instant::now();
1344 let nodes = self.build_mark_index(mark_type);
1345 let duration = start.elapsed();
1346
1347 println!(
1348 "实时构建标记索引 '{}', 耗时: {:?}, 节点数: {}",
1349 mark_type,
1350 duration,
1351 nodes.len()
1352 );
1353
1354 self.mark_index_cache.put(mark_type.to_string(), nodes.clone());
1355 nodes
1356 }
1357
1358 pub fn smart_query<F>(
1360 &mut self,
1361 query_name: &str,
1362 query_fn: F,
1363 ) -> Vec<Arc<Node>>
1364 where
1365 F: Fn() -> Vec<Arc<Node>>,
1366 {
1367 let cache_key = self.generate_cache_key(query_name);
1369
1370 if let Some(cache) = &self.query_cache {
1372 if let Some(cached) = cache.peek(&cache_key) {
1373 return cached.clone();
1374 }
1375 }
1376
1377 let start = Instant::now();
1379 let result = query_fn();
1380 let duration = start.elapsed();
1381
1382 println!(
1383 "执行查询 '{}', 耗时: {:?}, 结果数: {}",
1384 query_name,
1385 duration,
1386 result.len()
1387 );
1388
1389 if let Some(cache) = &mut self.query_cache {
1391 cache.put(cache_key, result.clone());
1392 }
1393
1394 result
1395 }
1396
1397 pub fn combined_query(
1399 &mut self,
1400 conditions: &[QueryCondition],
1401 ) -> Vec<Arc<Node>> {
1402 let cache_key = self.generate_combined_cache_key(conditions);
1403
1404 if let Some(cache) = &self.query_cache {
1406 if let Some(cached) = cache.peek(&cache_key) {
1407 return cached.clone();
1408 }
1409 }
1410
1411 let mut candidates: Option<Vec<Arc<Node>>> = None;
1412
1413 for condition in conditions {
1415 let indexed_nodes = match condition {
1416 QueryCondition::ByType(type_name) => {
1417 if self.should_use_type_index(type_name) {
1418 Some(self.by_type_lazy(type_name))
1419 } else {
1420 None
1421 }
1422 },
1423 QueryCondition::ByDepth(depth) => {
1424 if self.should_use_depth_index(*depth) {
1425 Some(self.by_depth_lazy(*depth))
1426 } else {
1427 None
1428 }
1429 },
1430 QueryCondition::ByMark(mark_type) => {
1431 if self.should_use_mark_index(mark_type) {
1432 Some(self.by_mark_lazy(mark_type))
1433 } else {
1434 None
1435 }
1436 },
1437 QueryCondition::ByAttr { .. }
1438 | QueryCondition::IsLeaf
1439 | QueryCondition::HasChildren => None,
1440 };
1441
1442 if let Some(indexed) = indexed_nodes {
1443 candidates = match candidates {
1444 None => Some(indexed),
1445 Some(existing) => {
1446 Some(self.intersect_nodes(&existing, &indexed))
1447 },
1448 };
1449 }
1450 }
1451
1452 let result = match candidates {
1454 Some(nodes) => nodes
1455 .into_par_iter()
1456 .filter(|node| conditions.iter().all(|cond| cond.matches(node)))
1457 .collect(),
1458 None => {
1459 self.pool.parallel_query(|node| {
1461 conditions.iter().all(|cond| cond.matches(node))
1462 })
1463 },
1464 };
1465
1466 if let Some(cache) = &mut self.query_cache {
1468 cache.put(cache_key, result.clone());
1469 }
1470
1471 result
1472 }
1473
1474 fn update_type_stats(
1477 &mut self,
1478 type_name: &str,
1479 ) {
1480 let stats = self
1481 .type_query_stats
1482 .entry(type_name.to_string())
1483 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1484 stats.count += 1;
1485 stats.last_query = Instant::now();
1486 }
1487
1488 fn update_depth_stats(
1489 &mut self,
1490 depth: usize,
1491 ) {
1492 let stats = self
1493 .depth_query_stats
1494 .entry(depth)
1495 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1496 stats.count += 1;
1497 stats.last_query = Instant::now();
1498 }
1499
1500 fn update_mark_stats(
1501 &mut self,
1502 mark_type: &str,
1503 ) {
1504 let stats = self
1505 .mark_query_stats
1506 .entry(mark_type.to_string())
1507 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1508 stats.count += 1;
1509 stats.last_query = Instant::now();
1510 }
1511
1512 fn should_use_type_index(
1513 &self,
1514 type_name: &str,
1515 ) -> bool {
1516 self.type_query_stats
1517 .get(type_name)
1518 .map(|stats| stats.count >= self.config.index_build_threshold)
1519 .unwrap_or(false)
1520 }
1521
1522 fn should_use_depth_index(
1523 &self,
1524 depth: usize,
1525 ) -> bool {
1526 self.depth_query_stats
1527 .get(&depth)
1528 .map(|stats| stats.count >= self.config.index_build_threshold)
1529 .unwrap_or(false)
1530 }
1531
1532 fn should_use_mark_index(
1533 &self,
1534 mark_type: &str,
1535 ) -> bool {
1536 self.mark_query_stats
1537 .get(mark_type)
1538 .map(|stats| stats.count >= self.config.index_build_threshold)
1539 .unwrap_or(false)
1540 }
1541
1542 fn build_type_index(
1543 &self,
1544 node_type: &str,
1545 ) -> Vec<Arc<Node>> {
1546 self.pool.parallel_query(|node| node.r#type == node_type)
1547 }
1548
1549 fn build_depth_index(
1550 &self,
1551 target_depth: usize,
1552 ) -> Vec<Arc<Node>> {
1553 self.pool.parallel_query(|node| {
1554 self.pool
1555 .get_node_depth(&node.id)
1556 .map(|depth| depth == target_depth)
1557 .unwrap_or(false)
1558 })
1559 }
1560
1561 fn build_mark_index(
1562 &self,
1563 mark_type: &str,
1564 ) -> Vec<Arc<Node>> {
1565 self.pool.parallel_query(|node| {
1566 node.marks.iter().any(|mark| mark.r#type == mark_type)
1567 })
1568 }
1569
1570 fn generate_cache_key(
1571 &self,
1572 query_name: &str,
1573 ) -> String {
1574 use std::collections::hash_map::DefaultHasher;
1575 use std::hash::{Hash, Hasher};
1576
1577 let mut hasher = DefaultHasher::new();
1578 query_name.hash(&mut hasher);
1579 self.pool.key().hash(&mut hasher);
1580 format!("query_{:x}", hasher.finish())
1581 }
1582
1583 fn generate_combined_cache_key(
1584 &self,
1585 conditions: &[QueryCondition],
1586 ) -> String {
1587 use std::collections::hash_map::DefaultHasher;
1588 use std::hash::{Hash, Hasher};
1589
1590 let mut hasher = DefaultHasher::new();
1591 for condition in conditions {
1592 condition.cache_key().hash(&mut hasher);
1593 }
1594 self.pool.key().hash(&mut hasher);
1595 format!("combined_{:x}", hasher.finish())
1596 }
1597
1598 fn intersect_nodes(
1599 &self,
1600 nodes1: &[Arc<Node>],
1601 nodes2: &[Arc<Node>],
1602 ) -> Vec<Arc<Node>> {
1603 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
1604 nodes2
1605 .iter()
1606 .filter(|node| set1.contains(node.id.as_ref()))
1607 .cloned()
1608 .collect()
1609 }
1610
1611 pub fn get_query_stats(&self) -> QueryStatsSummary {
1613 QueryStatsSummary {
1614 type_queries: self.type_query_stats.clone(),
1615 depth_queries: self.depth_query_stats.clone(),
1616 mark_queries: self.mark_query_stats.clone(),
1617 cache_hit_rates: self.calculate_cache_hit_rates(),
1618 }
1619 }
1620
1621 fn calculate_cache_hit_rates(&self) -> CacheHitRates {
1622 CacheHitRates {
1623 query_cache_size: self
1624 .query_cache
1625 .as_ref()
1626 .map(|c| c.len())
1627 .unwrap_or(0),
1628 type_index_cache_size: self.type_index_cache.len(),
1629 depth_index_cache_size: self.depth_index_cache.len(),
1630 mark_index_cache_size: self.mark_index_cache.len(),
1631 }
1632 }
1633}
1634
1635#[derive(Debug, Clone)]
1637pub enum QueryCondition {
1638 ByType(String),
1639 ByDepth(usize),
1640 ByMark(String),
1641 ByAttr { key: String, value: serde_json::Value },
1642 IsLeaf,
1643 HasChildren,
1644}
1645
1646impl QueryCondition {
1647 pub fn matches(
1648 &self,
1649 node: &Node,
1650 ) -> bool {
1651 match self {
1652 QueryCondition::ByType(type_name) => node.r#type == *type_name,
1653 QueryCondition::ByDepth(_) => true, QueryCondition::ByMark(mark_type) => {
1655 node.marks.iter().any(|mark| mark.r#type == *mark_type)
1656 },
1657 QueryCondition::ByAttr { key, value } => {
1658 node.attrs.get(key) == Some(value)
1659 },
1660 QueryCondition::IsLeaf => node.content.is_empty(),
1661 QueryCondition::HasChildren => !node.content.is_empty(),
1662 }
1663 }
1664
1665 pub fn cache_key(&self) -> String {
1666 match self {
1667 QueryCondition::ByType(t) => format!("type_{t}"),
1668 QueryCondition::ByDepth(d) => format!("depth_{d}"),
1669 QueryCondition::ByMark(m) => format!("mark_{m}"),
1670 QueryCondition::ByAttr { key, value } => {
1671 format!(
1672 "attr_{}_{}",
1673 key,
1674 serde_json::to_string(value).unwrap_or_default()
1675 )
1676 },
1677 QueryCondition::IsLeaf => "is_leaf".to_string(),
1678 QueryCondition::HasChildren => "has_children".to_string(),
1679 }
1680 }
1681}
1682
1683#[derive(Debug)]
1685pub struct QueryStatsSummary {
1686 pub type_queries: HashMap<String, QueryStats>,
1687 pub depth_queries: HashMap<usize, QueryStats>,
1688 pub mark_queries: HashMap<String, QueryStats>,
1689 pub cache_hit_rates: CacheHitRates,
1690}
1691
1692#[derive(Debug)]
1694pub struct CacheHitRates {
1695 pub query_cache_size: usize,
1696 pub type_index_cache_size: usize,
1697 pub depth_index_cache_size: usize,
1698 pub mark_index_cache_size: usize,
1699}
1700
1701impl NodePool {
1702 pub fn lazy_query(
1704 &self,
1705 config: LazyQueryConfig,
1706 ) -> LazyQueryEngine {
1707 LazyQueryEngine::new(self, config)
1709 }
1710}
1711
1712