1use crate::error::PoolResult;
2use crate::{node_type::NodeEnum, tree::Tree};
3
4use super::{error::error_helpers, node::Node, types::NodeId};
5use serde::{Deserialize, Serialize};
6use std::time::Instant;
7use std::{sync::Arc};
8use rayon::prelude::*;
9use std::marker::Sync;
10use std::collections::{HashMap, HashSet};
11use lru::LruCache;
12use std::num::NonZeroUsize;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15static POOL_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
17
18#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
22pub struct NodePool {
23 inner: Arc<Tree>,
25 #[serde(skip)]
27 key: String,
28}
29
30unsafe impl Send for NodePool {}
31unsafe impl Sync for NodePool {}
32
33impl NodePool {
34 pub fn new(inner: Arc<Tree>) -> Arc<NodePool> {
35 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
36 let pool = Self { inner, key: format!("pool_{}", id) };
37 let pool: Arc<NodePool> = Arc::new(pool);
38
39 pool
40 }
41
42 pub fn key(&self) -> &str {
44 &self.key
45 }
46
47 pub fn size(&self) -> usize {
49 self.inner.nodes.iter().map(|i| i.values().len()).sum()
50 }
51
52 pub fn root(&self) -> Arc<Node> {
53 self.inner[&self.inner.root_id].clone()
54 }
55
56 pub fn root_id(&self) -> &NodeId {
57 &self.inner.root_id
58 }
59
60 pub fn get_inner(&self) -> &Arc<Tree> {
61 &self.inner
62 }
63
64 pub fn from(nodes: NodeEnum) -> Arc<NodePool> {
75 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
76 let pool = Self {
77 inner: Arc::new(Tree::from(nodes)),
78 key: format!("pool_{}", id),
79 };
80 let pool: Arc<NodePool> = Arc::new(pool);
81 pool
82 }
83
84 pub fn get_node(
88 &self,
89 id: &NodeId,
90 ) -> Option<Arc<Node>> {
91 self.inner.get_node(id)
92 }
93 pub fn get_parent_node(
94 &self,
95 id: &NodeId,
96 ) -> Option<Arc<Node>> {
97 self.inner.get_parent_node(id)
98 }
99
100 pub fn contains_node(
102 &self,
103 id: &NodeId,
104 ) -> bool {
105 self.inner.contains_node(id)
106 }
107
108 pub fn children(
112 &self,
113 parent_id: &NodeId,
114 ) -> Option<im::Vector<NodeId>> {
115 self.get_node(parent_id).map(|n| n.content.clone())
116 }
117
118 pub fn descendants(
120 &self,
121 parent_id: &NodeId,
122 ) -> Vec<Arc<Node>> {
123 let mut result: Vec<Arc<Node>> = Vec::new();
124 self._collect_descendants(parent_id, &mut result);
125 result
126 }
127
128 fn _collect_descendants(
129 &self,
130 parent_id: &NodeId,
131 result: &mut Vec<Arc<Node>>,
132 ) {
133 if let Some(children) = self.children(parent_id) {
134 for child_id in &children {
135 if let Some(child) = self.get_node(child_id) {
136 result.push(child);
137 self._collect_descendants(child_id, result);
138 }
139 }
140 }
141 }
142 pub fn for_each<F>(
143 &self,
144 id: &NodeId,
145 f: F,
146 ) where
147 F: Fn(&Arc<Node>),
148 {
149 if let Some(children) = self.children(id) {
150 for child_id in &children {
151 if let Some(child) = self.get_node(child_id) {
152 f(&child);
153 }
154 }
155 }
156 }
157 pub fn parent_id(
159 &self,
160 child_id: &NodeId,
161 ) -> Option<&NodeId> {
162 self.inner.parent_map.get(child_id)
163 }
164
165 pub fn ancestors(
167 &self,
168 child_id: &NodeId,
169 ) -> Vec<Arc<Node>> {
170 let mut chain = Vec::new();
171 let mut current_id = child_id;
172 while let Some(parent_id) = self.parent_id(current_id) {
173 if let Some(parent) = self.get_node(parent_id) {
174 chain.push(parent);
175 current_id = parent_id;
176 } else {
177 break;
178 }
179 }
180 chain
181 }
182
183 pub fn validate_hierarchy(&self) -> PoolResult<()> {
185 for (child_id, parent_id) in &self.inner.parent_map {
186 if !self.contains_node(parent_id) {
188 return Err(error_helpers::orphan_node(child_id.clone()));
189 }
190
191 if let Some(children) = self.children(parent_id) {
193 if !children.contains(child_id) {
194 return Err(error_helpers::invalid_parenting(
195 child_id.clone(),
196 parent_id.clone(),
197 ));
198 }
199 }
200 }
201 Ok(())
202 }
203
204 pub fn filter_nodes<P>(
207 &self,
208 predicate: P,
209 ) -> Vec<Arc<Node>>
210 where
211 P: Fn(&Node) -> bool,
212 {
213 self.get_all_nodes().into_iter().filter(|n| predicate(n)).collect()
214 }
215 pub fn find_node<P>(
217 &self,
218 predicate: P,
219 ) -> Option<Arc<Node>>
220 where
221 P: Fn(&Node) -> bool,
222 {
223 self.get_all_nodes().into_iter().find(|n| predicate(n))
224 }
225
226 pub fn get_node_depth(
236 &self,
237 node_id: &NodeId,
238 ) -> Option<usize> {
239 let mut depth = 0;
240 let mut current_id = node_id;
241
242 while let Some(parent_id) = self.parent_id(current_id) {
243 depth += 1;
244 current_id = parent_id;
245 }
246
247 Some(depth)
248 }
249
250 pub fn get_node_path(
260 &self,
261 node_id: &NodeId,
262 ) -> Vec<NodeId> {
263 let mut path = Vec::new();
264 let mut current_id = node_id;
265
266 while let Some(parent_id) = self.parent_id(current_id) {
267 path.push(current_id.clone());
268 current_id = parent_id;
269 }
270 path.push(current_id.clone());
271 path.reverse();
272
273 path
274 }
275 pub fn resolve(
277 &self,
278 node_id: &NodeId,
279 ) -> Vec<Arc<Node>> {
280 let mut result = Vec::new();
281 let mut current_id = node_id;
282
283 loop {
285 if let Some(node) = self.get_node(current_id) {
286 result.push(node);
287 }
288
289 if let Some(parent_id) = self.parent_id(current_id) {
290 current_id = parent_id;
291 } else {
292 break;
294 }
295 }
296
297 result.reverse();
299 result
300 }
301
302 pub fn is_leaf(
312 &self,
313 node_id: &NodeId,
314 ) -> bool {
315 if let Some(children) = self.children(node_id) {
316 children.is_empty()
317 } else {
318 true
319 }
320 }
321
322 pub fn get_left_siblings(
324 &self,
325 node_id: &NodeId,
326 ) -> Vec<NodeId> {
327 if let Some(parent_id) = self.parent_id(node_id) {
328 if let Some(siblings) = self.children(parent_id) {
329 if let Some(index) =
330 siblings.iter().position(|id| id == node_id)
331 {
332 return siblings.iter().take(index).cloned().collect();
333 } else {
334 eprintln!(
336 "Warning: Node {:?} not found in parent's children list",
337 node_id
338 );
339 }
340 }
341 }
342 Vec::new()
343 }
344 pub fn get_right_siblings(
346 &self,
347 node_id: &NodeId,
348 ) -> Vec<NodeId> {
349 if let Some(parent_id) = self.parent_id(node_id) {
350 if let Some(siblings) = self.children(parent_id) {
351 if let Some(index) =
352 siblings.iter().position(|id| id == node_id)
353 {
354 return siblings.iter().skip(index + 1).cloned().collect();
355 } else {
356 eprintln!(
358 "Warning: Node {:?} not found in parent's children list",
359 node_id
360 );
361 }
362 }
363 }
364 Vec::new()
365 }
366 pub fn get_left_nodes(
368 &self,
369 node_id: &NodeId,
370 ) -> Vec<Arc<Node>> {
371 let siblings = self.get_left_siblings(node_id);
372 let mut result = Vec::new();
373 for sibling_id in siblings {
374 if let Some(node) = self.get_node(&sibling_id) {
375 result.push(node);
376 }
377 }
378 result
379 }
380
381 pub fn get_right_nodes(
383 &self,
384 node_id: &NodeId,
385 ) -> Vec<Arc<Node>> {
386 let siblings = self.get_right_siblings(node_id);
387 let mut result = Vec::new();
388 for sibling_id in siblings {
389 if let Some(node) = self.get_node(&sibling_id) {
390 result.push(node);
391 }
392 }
393 result
394 }
395
396 pub fn get_all_siblings(
406 &self,
407 node_id: &NodeId,
408 ) -> Vec<NodeId> {
409 if let Some(parent_id) = self.parent_id(node_id) {
410 if let Some(children) = self.children(parent_id) {
411 return children.iter().cloned().collect();
412 }
413 }
414 Vec::new()
415 }
416
417 pub fn get_subtree_size(
427 &self,
428 node_id: &NodeId,
429 ) -> usize {
430 let mut size = 1; if let Some(children) = self.children(node_id) {
432 for child_id in &children {
433 size += self.get_subtree_size(child_id);
434 }
435 }
436 size
437 }
438
439 pub fn is_ancestor(
450 &self,
451 ancestor_id: &NodeId,
452 descendant_id: &NodeId,
453 ) -> bool {
454 let mut current_id = descendant_id;
455 while let Some(parent_id) = self.parent_id(current_id) {
456 if parent_id == ancestor_id {
457 return true;
458 }
459 current_id = parent_id;
460 }
461 false
462 }
463
464 pub fn get_lowest_common_ancestor(
475 &self,
476 node1_id: &NodeId,
477 node2_id: &NodeId,
478 ) -> Option<NodeId> {
479 let path1 = self.get_node_path(node1_id);
480 let path2 = self.get_node_path(node2_id);
481
482 for ancestor_id in path1.iter().rev() {
483 if path2.contains(ancestor_id) {
484 return Some(ancestor_id.clone());
485 }
486 }
487 None
488 }
489
490 pub fn parallel_query<P>(
500 &self,
501 predicate: P,
502 ) -> Vec<Arc<Node>>
503 where
504 P: Fn(&Node) -> bool + Send + Sync,
505 {
506 let shards: Vec<_> = self.inner.nodes.iter().collect();
508
509 shards
511 .into_par_iter()
512 .flat_map(|shard| {
513 shard
515 .values()
516 .filter(|node| predicate(node))
517 .cloned()
518 .collect::<Vec<_>>()
519 })
520 .collect()
521 }
522
523 pub fn parallel_batch_query<'a, P>(
534 &'a self,
535 batch_size: usize,
536 predicate: P,
537 ) -> Vec<Arc<Node>>
538 where
539 P: Fn(&[Arc<Node>]) -> Vec<Arc<Node>> + Send + Sync,
540 {
541 let shards: Vec<_> = self.inner.nodes.iter().collect();
543
544 shards
546 .into_par_iter()
547 .flat_map(|shard| {
548 let nodes: Vec<_> = shard.values().cloned().collect();
550
551 nodes
553 .chunks(batch_size)
554 .flat_map(|chunk| predicate(chunk))
555 .collect::<Vec<_>>()
556 })
557 .collect()
558 }
559
560 pub fn parallel_query_map<'a, P, T, F>(
571 &'a self,
572 predicate: P,
573 transform: F,
574 ) -> Vec<T>
575 where
576 P: Fn(&Node) -> bool + Send + Sync,
577 F: Fn(&Arc<Node>) -> T + Send + Sync,
578 T: Send,
579 {
580 let shards: Vec<_> = self.inner.nodes.iter().collect();
582
583 shards
585 .into_par_iter()
586 .flat_map(|shard| {
587 shard
589 .values()
590 .filter(|node| predicate(node))
591 .map(|node| transform(node))
592 .collect::<Vec<T>>()
593 })
594 .collect()
595 }
596
597 pub fn parallel_query_reduce<P, T, F>(
609 &self,
610 predicate: P,
611 init: T,
612 fold: F,
613 ) -> T
614 where
615 P: Fn(&Node) -> bool + Send + Sync,
616 F: Fn(T, &Arc<Node>) -> T + Send + Sync,
617 T: Send + Sync + Clone,
618 {
619 let dummy_node = Arc::new(Node::new(
620 "",
621 "".to_string(),
622 Default::default(),
623 vec![],
624 vec![],
625 ));
626
627 let shards: Vec<_> = self.inner.nodes.iter().collect();
629
630 shards
632 .into_par_iter()
633 .map(|shard| {
634 shard
636 .values()
637 .filter(|node| predicate(node))
638 .fold(init.clone(), |acc, node| fold(acc, node))
639 })
640 .reduce(|| init.clone(), |a, _b| fold(a, &dummy_node))
642 }
643
644 fn get_all_nodes(&self) -> Vec<Arc<Node>> {
646 let mut result = Vec::new();
647 for shard in &self.inner.nodes {
648 for node in shard.values() {
649 result.push(node.clone());
650 }
651 }
652 result
653 }
654}
655
656pub struct QueryEngine<'a> {
658 pool: &'a NodePool,
659 conditions: Vec<Box<dyn Fn(&Node) -> bool + Send + Sync + 'a>>,
660}
661
662impl<'a> QueryEngine<'a> {
663 pub fn new(pool: &'a NodePool) -> Self {
665 Self { pool, conditions: Vec::new() }
666 }
667
668 pub fn by_type(
670 mut self,
671 node_type: &'a str,
672 ) -> Self {
673 let node_type = node_type.to_string();
674 self.conditions.push(Box::new(move |node| node.r#type == node_type));
675 self
676 }
677
678 pub fn by_attr(
680 mut self,
681 key: &'a str,
682 value: &'a serde_json::Value,
683 ) -> Self {
684 let key = key.to_string();
685 let value = value.clone();
686 self.conditions.push(Box::new(move |node| {
687 node.attrs.get(&key).map_or(false, |v| v == &value)
688 }));
689 self
690 }
691
692 pub fn by_mark(
694 mut self,
695 mark_type: &'a str,
696 ) -> Self {
697 let mark_type = mark_type.to_string();
698 self.conditions.push(Box::new(move |node| {
699 node.marks.iter().any(|mark| mark.r#type == mark_type)
700 }));
701 self
702 }
703
704 pub fn by_child_count(
706 mut self,
707 count: usize,
708 ) -> Self {
709 self.conditions.push(Box::new(move |node| node.content.len() == count));
710 self
711 }
712
713 pub fn by_depth(
715 mut self,
716 depth: usize,
717 ) -> Self {
718 let pool = self.pool.clone();
719 self.conditions.push(Box::new(move |node| {
720 pool.get_node_depth(&node.id).map_or(false, |d| d == depth)
721 }));
722 self
723 }
724
725 pub fn by_ancestor_type(
727 mut self,
728 ancestor_type: &'a str,
729 ) -> Self {
730 let pool = self.pool.clone();
731 let ancestor_type = ancestor_type.to_string();
732 self.conditions.push(Box::new(move |node| {
733 pool.ancestors(&node.id)
734 .iter()
735 .any(|ancestor| ancestor.r#type == ancestor_type)
736 }));
737 self
738 }
739
740 pub fn by_descendant_type(
742 mut self,
743 descendant_type: &'a str,
744 ) -> Self {
745 let pool = self.pool.clone();
746 let descendant_type = descendant_type.to_string();
747 self.conditions.push(Box::new(move |node| {
748 pool.descendants(&node.id)
749 .iter()
750 .any(|descendant| descendant.r#type == descendant_type)
751 }));
752 self
753 }
754
755 pub fn find_all(&self) -> Vec<Arc<Node>> {
757 self.pool
758 .get_all_nodes()
759 .into_iter()
760 .filter(|node| {
761 self.conditions.iter().all(|condition| condition(node))
762 })
763 .collect()
764 }
765
766 pub fn find_first(&self) -> Option<Arc<Node>> {
768 self.pool.get_all_nodes().into_iter().find(|node| {
769 self.conditions.iter().all(|condition| condition(node))
770 })
771 }
772
773 pub fn count(&self) -> usize {
775 self.pool
776 .get_all_nodes()
777 .into_iter()
778 .filter(|node| {
779 self.conditions.iter().all(|condition| condition(node))
780 })
781 .count()
782 }
783
784 pub fn parallel_find_all(&self) -> Vec<Arc<Node>> {
786 let conditions = &self.conditions;
787 self.pool.parallel_query(|node| {
788 conditions.iter().all(|condition| condition(node))
789 })
790 }
791
792 pub fn parallel_find_first(&self) -> Option<Arc<Node>> {
794 let conditions = &self.conditions;
795 self.pool.get_all_nodes().into_par_iter().find_any(move |node| {
796 conditions.iter().all(|condition| condition(node))
797 })
798 }
799
800 pub fn parallel_count(&self) -> usize {
802 let conditions = &self.conditions;
803 self.pool
804 .get_all_nodes()
805 .into_par_iter()
806 .filter(move |node| {
807 conditions.iter().all(|condition| condition(node))
808 })
809 .count()
810 }
811}
812
813impl NodePool {
814 pub fn query(&self) -> QueryEngine {
816 QueryEngine::new(self)
817 }
818}
819
820#[derive(Clone, Debug)]
822pub struct QueryCacheConfig {
823 pub capacity: usize,
825 pub enabled: bool,
827}
828
829impl Default for QueryCacheConfig {
830 fn default() -> Self {
831 Self { capacity: 1000, enabled: true }
832 }
833}
834
835pub struct OptimizedQueryEngine {
837 pool: Arc<NodePool>,
838 cache: Option<LruCache<String, Vec<Arc<Node>>>>,
839 type_index: HashMap<String, Vec<Arc<Node>>>,
840 depth_index: HashMap<usize, Vec<Arc<Node>>>,
841 mark_index: HashMap<String, Vec<Arc<Node>>>,
842}
843
844impl OptimizedQueryEngine {
845 pub fn new(
846 pool: &NodePool,
847 config: QueryCacheConfig,
848 ) -> Self {
849 let mut engine = Self {
850 pool: Arc::new(pool.clone()),
851 cache: if config.enabled {
852 Some(LruCache::new(NonZeroUsize::new(config.capacity).unwrap()))
853 } else {
854 None
855 },
856 type_index: HashMap::new(),
857 depth_index: HashMap::new(),
858 mark_index: HashMap::new(),
859 };
860 let start = Instant::now();
861 engine.build_indices();
862 let duration = start.elapsed();
863 println!("索引构建完成,耗时: {:?}", duration);
864 engine
865 }
866
867 fn build_indices(&mut self) {
869 use rayon::prelude::*;
870 use std::collections::HashMap;
871 use std::sync::Mutex;
872
873 use std::sync::Arc;
874 let node_count = self.pool.size();
876
877 let type_index =
879 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 5)));
880 let depth_index = Arc::new(Mutex::new(HashMap::with_capacity(10)));
881 let mark_index =
882 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 10)));
883
884 let optimal_shard_size = 1000; let mut all_nodes: Vec<_> = self
889 .pool
890 .inner
891 .nodes
892 .iter()
893 .flat_map(|shard| shard.values().cloned())
894 .collect();
895
896 all_nodes.sort_by(|a, b| a.id.cmp(&b.id));
898
899 let shards: Vec<_> = all_nodes.chunks(optimal_shard_size).collect();
901
902 shards.into_par_iter().for_each(|shard| {
904 let mut local_type_index = HashMap::with_capacity(shard.len() / 5);
906 let mut local_depth_index = HashMap::with_capacity(5);
907 let mut local_mark_index = HashMap::with_capacity(shard.len() / 10);
908
909 let mut type_nodes = Vec::with_capacity(shard.len());
911 let mut depth_nodes = Vec::with_capacity(shard.len());
912 let mut mark_nodes = Vec::with_capacity(shard.len() * 2);
913
914 for node in shard {
916 type_nodes.push((node.r#type.clone(), Arc::clone(node)));
918
919 if let Some(depth) = self.pool.get_node_depth(&node.id) {
921 depth_nodes.push((depth, Arc::clone(node)));
922 }
923
924 for mark in &node.marks {
926 mark_nodes.push((mark.r#type.clone(), Arc::clone(node)));
927 }
928 }
929
930 for (type_name, node) in type_nodes {
932 local_type_index
933 .entry(type_name)
934 .or_insert_with(|| Vec::with_capacity(shard.len() / 5))
935 .push(node);
936 }
937
938 for (depth, node) in depth_nodes {
939 local_depth_index
940 .entry(depth)
941 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
942 .push(node);
943 }
944
945 for (mark_type, node) in mark_nodes {
946 local_mark_index
947 .entry(mark_type)
948 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
949 .push(node);
950 }
951
952 {
954 let mut type_idx = type_index.lock().unwrap();
955 for (k, v) in local_type_index {
956 type_idx
957 .entry(k)
958 .or_insert_with(|| Vec::with_capacity(v.len()))
959 .extend(v);
960 }
961 }
962 {
963 let mut depth_idx = depth_index.lock().unwrap();
964 for (k, v) in local_depth_index {
965 depth_idx
966 .entry(k)
967 .or_insert_with(|| Vec::with_capacity(v.len()))
968 .extend(v);
969 }
970 }
971 {
972 let mut mark_idx = mark_index.lock().unwrap();
973 for (k, v) in local_mark_index {
974 mark_idx
975 .entry(k)
976 .or_insert_with(|| Vec::with_capacity(v.len()))
977 .extend(v);
978 }
979 }
980 });
981
982 self.type_index =
984 Arc::try_unwrap(type_index).unwrap().into_inner().unwrap();
985 self.depth_index =
986 Arc::try_unwrap(depth_index).unwrap().into_inner().unwrap();
987 self.mark_index =
988 Arc::try_unwrap(mark_index).unwrap().into_inner().unwrap();
989 }
990
991 pub fn by_type(
993 &self,
994 node_type: &str,
995 ) -> Vec<Arc<Node>> {
996 self.type_index.get(node_type).cloned().unwrap_or_default()
997 }
998
999 pub fn by_depth(
1001 &self,
1002 depth: usize,
1003 ) -> Vec<Arc<Node>> {
1004 self.depth_index.get(&depth).cloned().unwrap_or_default()
1005 }
1006
1007 pub fn by_mark(
1009 &self,
1010 mark_type: &str,
1011 ) -> Vec<Arc<Node>> {
1012 self.mark_index.get(mark_type).cloned().unwrap_or_default()
1013 }
1014
1015 pub fn query(
1017 &mut self,
1018 conditions: Vec<Box<dyn Fn(&Node) -> bool + Send + Sync>>,
1019 ) -> Vec<Arc<Node>> {
1020 let cache_key = self.generate_query_cache_key(&conditions);
1022
1023 if let Some(cache) = &self.cache {
1025 if let Some(cached) = cache.peek(&cache_key) {
1026 return cached.clone();
1027 }
1028 }
1029
1030 let mut candidates: Option<Vec<Arc<Node>>> = None;
1032
1033 for condition in &conditions {
1035 if let Some(indexed) = self.get_indexed_nodes(condition) {
1036 candidates = match candidates {
1037 None => Some(indexed),
1038 Some(existing) => {
1039 Some(self.intersect_nodes(&existing, &indexed))
1040 },
1041 };
1042 }
1043 }
1044
1045 let result: Vec<Arc<Node>> = match candidates {
1046 Some(nodes) => {
1047 nodes
1049 .par_iter()
1050 .filter(|node| {
1051 conditions.iter().all(|condition| condition(node))
1052 })
1053 .cloned()
1054 .collect()
1055 },
1056 None => {
1057 self.pool
1059 .parallel_query(|node| {
1060 conditions.iter().all(|condition| condition(node))
1061 })
1062 .into_iter()
1063 .collect()
1064 },
1065 };
1066
1067 if let Some(cache) = &mut self.cache {
1069 cache.put(cache_key, result.clone());
1070 }
1071
1072 result
1073 }
1074
1075 fn generate_query_cache_key(
1077 &self,
1078 conditions: &[Box<dyn Fn(&Node) -> bool + Send + Sync>],
1079 ) -> String {
1080 use std::collections::hash_map::DefaultHasher;
1081 use std::hash::{Hash, Hasher};
1082
1083 let mut hasher = DefaultHasher::new();
1084
1085 conditions.len().hash(&mut hasher);
1087 self.pool.key().hash(&mut hasher);
1088
1089 for (i, _condition) in conditions.iter().enumerate() {
1091 i.hash(&mut hasher);
1093 std::ptr::addr_of!(_condition).hash(&mut hasher);
1094 }
1095
1096 format!("query_{:x}", hasher.finish())
1097 }
1098
1099 fn get_indexed_nodes(
1101 &self,
1102 condition: &Box<dyn Fn(&Node) -> bool + Send + Sync>,
1103 ) -> Option<Vec<Arc<Node>>> {
1104 if let Some(type_nodes) = self.type_index.get("document") {
1106 if condition(&type_nodes[0]) {
1107 return Some(type_nodes.clone());
1108 }
1109 }
1110
1111 if let Some(depth_nodes) = self.depth_index.get(&0) {
1113 if condition(&depth_nodes[0]) {
1114 return Some(depth_nodes.clone());
1115 }
1116 }
1117
1118 for (_, mark_nodes) in &self.mark_index {
1120 if !mark_nodes.is_empty() && condition(&mark_nodes[0]) {
1121 return Some(mark_nodes.clone());
1122 }
1123 }
1124
1125 None
1126 }
1127
1128 fn intersect_nodes(
1130 &self,
1131 nodes1: &[Arc<Node>],
1132 nodes2: &[Arc<Node>],
1133 ) -> Vec<Arc<Node>> {
1134 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_str()).collect();
1135 nodes2
1136 .iter()
1137 .filter(|node| set1.contains(node.id.as_str()))
1138 .cloned()
1139 .collect()
1140 }
1141}
1142
1143impl NodePool {
1144 pub fn optimized_query(
1146 &self,
1147 config: QueryCacheConfig,
1148 ) -> OptimizedQueryEngine {
1149 let engine = OptimizedQueryEngine::new(self, config);
1150 engine
1151 }
1152}
1153
1154impl Clone for OptimizedQueryEngine {
1156 fn clone(&self) -> Self {
1157 Self {
1158 pool: self.pool.clone(),
1159 cache: self.cache.clone(),
1160 type_index: self.type_index.clone(),
1161 depth_index: self.depth_index.clone(),
1162 mark_index: self.mark_index.clone(),
1163 }
1164 }
1165}
1166
1167#[derive(Clone, Debug)]
1169pub struct LazyQueryConfig {
1170 pub cache_capacity: usize,
1172 pub index_cache_capacity: usize,
1174 pub cache_enabled: bool,
1176 pub index_build_threshold: usize,
1178}
1179
1180impl Default for LazyQueryConfig {
1181 fn default() -> Self {
1182 Self {
1183 cache_capacity: 1000,
1184 index_cache_capacity: 100,
1185 cache_enabled: true,
1186 index_build_threshold: 5,
1187 }
1188 }
1189}
1190
1191#[derive(Debug, Clone)]
1193pub struct QueryStats {
1194 count: usize,
1196 last_query: Instant,
1198}
1199
1200pub struct LazyQueryEngine {
1202 pool: Arc<NodePool>,
1203
1204 query_cache: Option<LruCache<String, Vec<Arc<Node>>>>,
1206
1207 type_index_cache: LruCache<String, Vec<Arc<Node>>>,
1209 depth_index_cache: LruCache<usize, Vec<Arc<Node>>>,
1210 mark_index_cache: LruCache<String, Vec<Arc<Node>>>,
1211
1212 type_query_stats: HashMap<String, QueryStats>,
1214 depth_query_stats: HashMap<usize, QueryStats>,
1215 mark_query_stats: HashMap<String, QueryStats>,
1216
1217 config: LazyQueryConfig,
1219}
1220
1221unsafe impl Send for LazyQueryEngine {}
1223unsafe impl Sync for LazyQueryEngine {}
1224
1225impl LazyQueryEngine {
1226 pub fn new(
1227 pool: &NodePool,
1228 config: LazyQueryConfig,
1229 ) -> Self {
1230 Self {
1231 pool: Arc::new(pool.clone()),
1232 query_cache: if config.cache_enabled {
1233 Some(LruCache::new(
1234 NonZeroUsize::new(config.cache_capacity).unwrap(),
1235 ))
1236 } else {
1237 None
1238 },
1239 type_index_cache: LruCache::new(
1240 NonZeroUsize::new(config.index_cache_capacity).unwrap(),
1241 ),
1242 depth_index_cache: LruCache::new(
1243 NonZeroUsize::new(config.index_cache_capacity).unwrap(),
1244 ),
1245 mark_index_cache: LruCache::new(
1246 NonZeroUsize::new(config.index_cache_capacity).unwrap(),
1247 ),
1248 type_query_stats: HashMap::new(),
1249 depth_query_stats: HashMap::new(),
1250 mark_query_stats: HashMap::new(),
1251 config,
1252 }
1253 }
1254
1255 pub fn by_type_lazy(
1257 &mut self,
1258 node_type: &str,
1259 ) -> Vec<Arc<Node>> {
1260 self.update_type_stats(node_type);
1262
1263 if let Some(cached) = self.type_index_cache.get(node_type) {
1265 return cached.clone();
1266 }
1267
1268 let start = Instant::now();
1270 let nodes = self.build_type_index(node_type);
1271 let duration = start.elapsed();
1272
1273 println!(
1274 "实时构建类型索引 '{}', 耗时: {:?}, 节点数: {}",
1275 node_type,
1276 duration,
1277 nodes.len()
1278 );
1279
1280 self.type_index_cache.put(node_type.to_string(), nodes.clone());
1282
1283 nodes
1284 }
1285
1286 pub fn by_depth_lazy(
1288 &mut self,
1289 depth: usize,
1290 ) -> Vec<Arc<Node>> {
1291 self.update_depth_stats(depth);
1292
1293 if let Some(cached) = self.depth_index_cache.get(&depth) {
1294 return cached.clone();
1295 }
1296
1297 let start = Instant::now();
1298 let nodes = self.build_depth_index(depth);
1299 let duration = start.elapsed();
1300
1301 println!(
1302 "实时构建深度索引 {}, 耗时: {:?}, 节点数: {}",
1303 depth,
1304 duration,
1305 nodes.len()
1306 );
1307
1308 self.depth_index_cache.put(depth, nodes.clone());
1309 nodes
1310 }
1311
1312 pub fn by_mark_lazy(
1314 &mut self,
1315 mark_type: &str,
1316 ) -> Vec<Arc<Node>> {
1317 self.update_mark_stats(mark_type);
1318
1319 if let Some(cached) = self.mark_index_cache.get(mark_type) {
1320 return cached.clone();
1321 }
1322
1323 let start = Instant::now();
1324 let nodes = self.build_mark_index(mark_type);
1325 let duration = start.elapsed();
1326
1327 println!(
1328 "实时构建标记索引 '{}', 耗时: {:?}, 节点数: {}",
1329 mark_type,
1330 duration,
1331 nodes.len()
1332 );
1333
1334 self.mark_index_cache.put(mark_type.to_string(), nodes.clone());
1335 nodes
1336 }
1337
1338 pub fn smart_query<F>(
1340 &mut self,
1341 query_name: &str,
1342 query_fn: F,
1343 ) -> Vec<Arc<Node>>
1344 where
1345 F: Fn() -> Vec<Arc<Node>>,
1346 {
1347 let cache_key = self.generate_cache_key(query_name);
1349
1350 if let Some(cache) = &self.query_cache {
1352 if let Some(cached) = cache.peek(&cache_key) {
1353 return cached.clone();
1354 }
1355 }
1356
1357 let start = Instant::now();
1359 let result = query_fn();
1360 let duration = start.elapsed();
1361
1362 println!(
1363 "执行查询 '{}', 耗时: {:?}, 结果数: {}",
1364 query_name,
1365 duration,
1366 result.len()
1367 );
1368
1369 if let Some(cache) = &mut self.query_cache {
1371 cache.put(cache_key, result.clone());
1372 }
1373
1374 result
1375 }
1376
1377 pub fn combined_query(
1379 &mut self,
1380 conditions: &[QueryCondition],
1381 ) -> Vec<Arc<Node>> {
1382 let cache_key = self.generate_combined_cache_key(conditions);
1383
1384 if let Some(cache) = &self.query_cache {
1386 if let Some(cached) = cache.peek(&cache_key) {
1387 return cached.clone();
1388 }
1389 }
1390
1391 let mut candidates: Option<Vec<Arc<Node>>> = None;
1392
1393 for condition in conditions {
1395 let indexed_nodes = match condition {
1396 QueryCondition::ByType(type_name) => {
1397 if self.should_use_type_index(type_name) {
1398 Some(self.by_type_lazy(type_name))
1399 } else {
1400 None
1401 }
1402 },
1403 QueryCondition::ByDepth(depth) => {
1404 if self.should_use_depth_index(*depth) {
1405 Some(self.by_depth_lazy(*depth))
1406 } else {
1407 None
1408 }
1409 },
1410 QueryCondition::ByMark(mark_type) => {
1411 if self.should_use_mark_index(mark_type) {
1412 Some(self.by_mark_lazy(mark_type))
1413 } else {
1414 None
1415 }
1416 },
1417 QueryCondition::ByAttr { .. }
1418 | QueryCondition::IsLeaf
1419 | QueryCondition::HasChildren => None,
1420 };
1421
1422 if let Some(indexed) = indexed_nodes {
1423 candidates = match candidates {
1424 None => Some(indexed),
1425 Some(existing) => {
1426 Some(self.intersect_nodes(&existing, &indexed))
1427 },
1428 };
1429 }
1430 }
1431
1432 let result = match candidates {
1434 Some(nodes) => nodes
1435 .into_par_iter()
1436 .filter(|node| conditions.iter().all(|cond| cond.matches(node)))
1437 .collect(),
1438 None => {
1439 self.pool.parallel_query(|node| {
1441 conditions.iter().all(|cond| cond.matches(node))
1442 })
1443 },
1444 };
1445
1446 if let Some(cache) = &mut self.query_cache {
1448 cache.put(cache_key, result.clone());
1449 }
1450
1451 result
1452 }
1453
1454 fn update_type_stats(
1457 &mut self,
1458 type_name: &str,
1459 ) {
1460 let stats = self
1461 .type_query_stats
1462 .entry(type_name.to_string())
1463 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1464 stats.count += 1;
1465 stats.last_query = Instant::now();
1466 }
1467
1468 fn update_depth_stats(
1469 &mut self,
1470 depth: usize,
1471 ) {
1472 let stats = self
1473 .depth_query_stats
1474 .entry(depth)
1475 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1476 stats.count += 1;
1477 stats.last_query = Instant::now();
1478 }
1479
1480 fn update_mark_stats(
1481 &mut self,
1482 mark_type: &str,
1483 ) {
1484 let stats = self
1485 .mark_query_stats
1486 .entry(mark_type.to_string())
1487 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1488 stats.count += 1;
1489 stats.last_query = Instant::now();
1490 }
1491
1492 fn should_use_type_index(
1493 &self,
1494 type_name: &str,
1495 ) -> bool {
1496 self.type_query_stats
1497 .get(type_name)
1498 .map(|stats| stats.count >= self.config.index_build_threshold)
1499 .unwrap_or(false)
1500 }
1501
1502 fn should_use_depth_index(
1503 &self,
1504 depth: usize,
1505 ) -> bool {
1506 self.depth_query_stats
1507 .get(&depth)
1508 .map(|stats| stats.count >= self.config.index_build_threshold)
1509 .unwrap_or(false)
1510 }
1511
1512 fn should_use_mark_index(
1513 &self,
1514 mark_type: &str,
1515 ) -> bool {
1516 self.mark_query_stats
1517 .get(mark_type)
1518 .map(|stats| stats.count >= self.config.index_build_threshold)
1519 .unwrap_or(false)
1520 }
1521
1522 fn build_type_index(
1523 &self,
1524 node_type: &str,
1525 ) -> Vec<Arc<Node>> {
1526 self.pool.parallel_query(|node| node.r#type == node_type)
1527 }
1528
1529 fn build_depth_index(
1530 &self,
1531 target_depth: usize,
1532 ) -> Vec<Arc<Node>> {
1533 self.pool.parallel_query(|node| {
1534 self.pool
1535 .get_node_depth(&node.id)
1536 .map(|depth| depth == target_depth)
1537 .unwrap_or(false)
1538 })
1539 }
1540
1541 fn build_mark_index(
1542 &self,
1543 mark_type: &str,
1544 ) -> Vec<Arc<Node>> {
1545 self.pool.parallel_query(|node| {
1546 node.marks.iter().any(|mark| mark.r#type == mark_type)
1547 })
1548 }
1549
1550 fn generate_cache_key(
1551 &self,
1552 query_name: &str,
1553 ) -> String {
1554 use std::collections::hash_map::DefaultHasher;
1555 use std::hash::{Hash, Hasher};
1556
1557 let mut hasher = DefaultHasher::new();
1558 query_name.hash(&mut hasher);
1559 self.pool.key().hash(&mut hasher);
1560 format!("query_{:x}", hasher.finish())
1561 }
1562
1563 fn generate_combined_cache_key(
1564 &self,
1565 conditions: &[QueryCondition],
1566 ) -> String {
1567 use std::collections::hash_map::DefaultHasher;
1568 use std::hash::{Hash, Hasher};
1569
1570 let mut hasher = DefaultHasher::new();
1571 for condition in conditions {
1572 condition.cache_key().hash(&mut hasher);
1573 }
1574 self.pool.key().hash(&mut hasher);
1575 format!("combined_{:x}", hasher.finish())
1576 }
1577
1578 fn intersect_nodes(
1579 &self,
1580 nodes1: &[Arc<Node>],
1581 nodes2: &[Arc<Node>],
1582 ) -> Vec<Arc<Node>> {
1583 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_str()).collect();
1584 nodes2
1585 .iter()
1586 .filter(|node| set1.contains(node.id.as_str()))
1587 .cloned()
1588 .collect()
1589 }
1590
1591 pub fn get_query_stats(&self) -> QueryStatsSummary {
1593 QueryStatsSummary {
1594 type_queries: self.type_query_stats.clone(),
1595 depth_queries: self.depth_query_stats.clone(),
1596 mark_queries: self.mark_query_stats.clone(),
1597 cache_hit_rates: self.calculate_cache_hit_rates(),
1598 }
1599 }
1600
1601 fn calculate_cache_hit_rates(&self) -> CacheHitRates {
1602 CacheHitRates {
1603 query_cache_size: self
1604 .query_cache
1605 .as_ref()
1606 .map(|c| c.len())
1607 .unwrap_or(0),
1608 type_index_cache_size: self.type_index_cache.len(),
1609 depth_index_cache_size: self.depth_index_cache.len(),
1610 mark_index_cache_size: self.mark_index_cache.len(),
1611 }
1612 }
1613}
1614
1615#[derive(Debug, Clone)]
1617pub enum QueryCondition {
1618 ByType(String),
1619 ByDepth(usize),
1620 ByMark(String),
1621 ByAttr { key: String, value: serde_json::Value },
1622 IsLeaf,
1623 HasChildren,
1624}
1625
1626impl QueryCondition {
1627 pub fn matches(
1628 &self,
1629 node: &Node,
1630 ) -> bool {
1631 match self {
1632 QueryCondition::ByType(type_name) => node.r#type == *type_name,
1633 QueryCondition::ByDepth(_) => true, QueryCondition::ByMark(mark_type) => {
1635 node.marks.iter().any(|mark| mark.r#type == *mark_type)
1636 },
1637 QueryCondition::ByAttr { key, value } => {
1638 node.attrs.get(key).map_or(false, |v| v == value)
1639 },
1640 QueryCondition::IsLeaf => node.content.is_empty(),
1641 QueryCondition::HasChildren => !node.content.is_empty(),
1642 }
1643 }
1644
1645 pub fn cache_key(&self) -> String {
1646 match self {
1647 QueryCondition::ByType(t) => format!("type_{}", t),
1648 QueryCondition::ByDepth(d) => format!("depth_{}", d),
1649 QueryCondition::ByMark(m) => format!("mark_{}", m),
1650 QueryCondition::ByAttr { key, value } => {
1651 format!(
1652 "attr_{}_{}",
1653 key,
1654 serde_json::to_string(value).unwrap_or_default()
1655 )
1656 },
1657 QueryCondition::IsLeaf => "is_leaf".to_string(),
1658 QueryCondition::HasChildren => "has_children".to_string(),
1659 }
1660 }
1661}
1662
1663#[derive(Debug)]
1665pub struct QueryStatsSummary {
1666 pub type_queries: HashMap<String, QueryStats>,
1667 pub depth_queries: HashMap<usize, QueryStats>,
1668 pub mark_queries: HashMap<String, QueryStats>,
1669 pub cache_hit_rates: CacheHitRates,
1670}
1671
1672#[derive(Debug)]
1674pub struct CacheHitRates {
1675 pub query_cache_size: usize,
1676 pub type_index_cache_size: usize,
1677 pub depth_index_cache_size: usize,
1678 pub mark_index_cache_size: usize,
1679}
1680
1681impl NodePool {
1682 pub fn lazy_query(
1684 &self,
1685 config: LazyQueryConfig,
1686 ) -> LazyQueryEngine {
1687 LazyQueryEngine::new(self, config)
1688 }
1689}
1690
1691