1use crate::error::PoolResult;
2use crate::{node_type::NodeEnum, tree::Tree};
3
4use super::{error::error_helpers, node::Node, types::NodeId};
5use serde::{Deserialize, Serialize};
6use std::time::Instant;
7use std::{sync::Arc};
8use rayon::prelude::*;
9use std::marker::Sync;
10use std::collections::{HashMap, HashSet};
11use lru::LruCache;
12use std::num::NonZeroUsize;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15static POOL_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
17
18type NodeCondition = Box<dyn Fn(&Node) -> bool + Send + Sync>;
20type NodeConditionRef<'a> = Box<dyn Fn(&Node) -> bool + Send + Sync + 'a>;
21
22#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
26pub struct NodePool {
27 inner: Arc<Tree>,
29 key: String,
31}
32
33impl NodePool {
34 pub fn new(inner: Arc<Tree>) -> Arc<NodePool> {
35 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
36 let pool = Self { inner, key: format!("pool_{id}") };
37 let pool: Arc<NodePool> = Arc::new(pool);
38
39 pool
40 }
41
42 pub fn key(&self) -> &str {
44 &self.key
45 }
46
47 pub fn size(&self) -> usize {
49 self.inner.nodes.iter().map(|i| i.values().len()).sum()
50 }
51
52 pub fn root(&self) -> Arc<Node> {
53 self.inner[&self.inner.root_id].clone()
54 }
55
56 pub fn root_id(&self) -> &NodeId {
57 &self.inner.root_id
58 }
59
60 pub fn get_inner(&self) -> &Arc<Tree> {
61 &self.inner
62 }
63
64 pub fn from(nodes: NodeEnum) -> Arc<NodePool> {
75 let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
76 let pool = Self {
77 inner: Arc::new(Tree::from(nodes)),
78 key: format!("pool_{id}"),
79 };
80 let pool: Arc<NodePool> = Arc::new(pool);
81 pool
82 }
83
84 pub fn get_node(
88 &self,
89 id: &NodeId,
90 ) -> Option<Arc<Node>> {
91 self.inner.get_node(id)
92 }
93 pub fn get_parent_node(
94 &self,
95 id: &NodeId,
96 ) -> Option<Arc<Node>> {
97 self.inner.get_parent_node(id)
98 }
99
100 pub fn contains_node(
102 &self,
103 id: &NodeId,
104 ) -> bool {
105 self.inner.contains_node(id)
106 }
107
108 pub fn children(
112 &self,
113 parent_id: &NodeId,
114 ) -> Option<imbl::Vector<NodeId>> {
115 self.get_node(parent_id).map(|n| n.content.clone())
116 }
117
118 pub fn descendants(
120 &self,
121 parent_id: &NodeId,
122 ) -> Vec<Arc<Node>> {
123 let mut result: Vec<Arc<Node>> = Vec::new();
124 self._collect_descendants(parent_id, &mut result);
125 result
126 }
127
128 fn _collect_descendants(
129 &self,
130 parent_id: &NodeId,
131 result: &mut Vec<Arc<Node>>,
132 ) {
133 if let Some(children) = self.children(parent_id) {
134 for child_id in &children {
135 if let Some(child) = self.get_node(child_id) {
136 result.push(child);
137 self._collect_descendants(child_id, result);
138 }
139 }
140 }
141 }
142 pub fn for_each<F>(
143 &self,
144 id: &NodeId,
145 f: F,
146 ) where
147 F: Fn(&Arc<Node>),
148 {
149 if let Some(children) = self.children(id) {
150 for child_id in &children {
151 if let Some(child) = self.get_node(child_id) {
152 f(&child);
153 }
154 }
155 }
156 }
157 pub fn parent_id(
159 &self,
160 child_id: &NodeId,
161 ) -> Option<&NodeId> {
162 self.inner.parent_map.get(child_id)
163 }
164
165 pub fn ancestors(
167 &self,
168 child_id: &NodeId,
169 ) -> Vec<Arc<Node>> {
170 let mut chain = Vec::new();
171 let mut current_id = child_id;
172 while let Some(parent_id) = self.parent_id(current_id) {
173 if let Some(parent) = self.get_node(parent_id) {
174 chain.push(parent);
175 current_id = parent_id;
176 } else {
177 break;
178 }
179 }
180 chain
181 }
182
183 pub fn validate_hierarchy(&self) -> PoolResult<()> {
185 for (child_id, parent_id) in &self.inner.parent_map {
186 if !self.contains_node(parent_id) {
188 return Err(error_helpers::orphan_node(child_id.clone()));
189 }
190
191 if let Some(children) = self.children(parent_id) {
193 if !children.contains(child_id) {
194 return Err(error_helpers::invalid_parenting(
195 child_id.clone(),
196 parent_id.clone(),
197 ));
198 }
199 }
200 }
201 Ok(())
202 }
203
204 pub fn filter_nodes<P>(
207 &self,
208 predicate: P,
209 ) -> Vec<Arc<Node>>
210 where
211 P: Fn(&Node) -> bool,
212 {
213 self.get_all_nodes().into_iter().filter(|n| predicate(n)).collect()
214 }
215 pub fn find_node<P>(
217 &self,
218 predicate: P,
219 ) -> Option<Arc<Node>>
220 where
221 P: Fn(&Node) -> bool,
222 {
223 self.get_all_nodes().into_iter().find(|n| predicate(n))
224 }
225
226 pub fn get_node_depth(
236 &self,
237 node_id: &NodeId,
238 ) -> Option<usize> {
239 let mut depth = 0;
240 let mut current_id = node_id;
241
242 while let Some(parent_id) = self.parent_id(current_id) {
243 depth += 1;
244 current_id = parent_id;
245 }
246
247 Some(depth)
248 }
249
250 pub fn get_node_path(
260 &self,
261 node_id: &NodeId,
262 ) -> Vec<NodeId> {
263 let mut path = Vec::new();
264 let mut current_id = node_id;
265
266 while let Some(parent_id) = self.parent_id(current_id) {
267 path.push(current_id.clone());
268 current_id = parent_id;
269 }
270 path.push(current_id.clone());
271 path.reverse();
272
273 path
274 }
275 pub fn resolve(
277 &self,
278 node_id: &NodeId,
279 ) -> Vec<Arc<Node>> {
280 let mut result = Vec::new();
281 let mut current_id = node_id;
282
283 loop {
285 if let Some(node) = self.get_node(current_id) {
286 result.push(node);
287 }
288
289 if let Some(parent_id) = self.parent_id(current_id) {
290 current_id = parent_id;
291 } else {
292 break;
294 }
295 }
296
297 result.reverse();
299 result
300 }
301
302 pub fn is_leaf(
312 &self,
313 node_id: &NodeId,
314 ) -> bool {
315 if let Some(children) = self.children(node_id) {
316 children.is_empty()
317 } else {
318 true
319 }
320 }
321
322 pub fn get_left_siblings(
324 &self,
325 node_id: &NodeId,
326 ) -> Vec<NodeId> {
327 if let Some(parent_id) = self.parent_id(node_id) {
328 if let Some(siblings) = self.children(parent_id) {
329 if let Some(index) =
330 siblings.iter().position(|id| id == node_id)
331 {
332 return siblings.iter().take(index).cloned().collect();
333 } else {
334 eprintln!(
336 "Warning: Node {node_id:?} not found in parent's children list"
337 );
338 }
339 }
340 }
341 Vec::new()
342 }
343 pub fn get_right_siblings(
345 &self,
346 node_id: &NodeId,
347 ) -> Vec<NodeId> {
348 if let Some(parent_id) = self.parent_id(node_id) {
349 if let Some(siblings) = self.children(parent_id) {
350 if let Some(index) =
351 siblings.iter().position(|id| id == node_id)
352 {
353 return siblings.iter().skip(index + 1).cloned().collect();
354 } else {
355 eprintln!(
357 "Warning: Node {node_id:?} not found in parent's children list"
358 );
359 }
360 }
361 }
362 Vec::new()
363 }
364 pub fn get_left_nodes(
366 &self,
367 node_id: &NodeId,
368 ) -> Vec<Arc<Node>> {
369 let siblings = self.get_left_siblings(node_id);
370 let mut result = Vec::new();
371 for sibling_id in siblings {
372 if let Some(node) = self.get_node(&sibling_id) {
373 result.push(node);
374 }
375 }
376 result
377 }
378
379 pub fn get_right_nodes(
381 &self,
382 node_id: &NodeId,
383 ) -> Vec<Arc<Node>> {
384 let siblings = self.get_right_siblings(node_id);
385 let mut result = Vec::new();
386 for sibling_id in siblings {
387 if let Some(node) = self.get_node(&sibling_id) {
388 result.push(node);
389 }
390 }
391 result
392 }
393
394 pub fn get_all_siblings(
404 &self,
405 node_id: &NodeId,
406 ) -> Vec<NodeId> {
407 if let Some(parent_id) = self.parent_id(node_id) {
408 if let Some(children) = self.children(parent_id) {
409 return children.iter().cloned().collect();
410 }
411 }
412 Vec::new()
413 }
414
415 pub fn get_subtree_size(
425 &self,
426 node_id: &NodeId,
427 ) -> usize {
428 let mut size = 1; if let Some(children) = self.children(node_id) {
430 for child_id in &children {
431 size += self.get_subtree_size(child_id);
432 }
433 }
434 size
435 }
436
437 pub fn is_ancestor(
448 &self,
449 ancestor_id: &NodeId,
450 descendant_id: &NodeId,
451 ) -> bool {
452 let mut current_id = descendant_id;
453 while let Some(parent_id) = self.parent_id(current_id) {
454 if parent_id == ancestor_id {
455 return true;
456 }
457 current_id = parent_id;
458 }
459 false
460 }
461
462 pub fn get_lowest_common_ancestor(
473 &self,
474 node1_id: &NodeId,
475 node2_id: &NodeId,
476 ) -> Option<NodeId> {
477 let path1 = self.get_node_path(node1_id);
478 let path2 = self.get_node_path(node2_id);
479
480 for ancestor_id in path1.iter().rev() {
481 if path2.contains(ancestor_id) {
482 return Some(ancestor_id.clone());
483 }
484 }
485 None
486 }
487
488 pub fn parallel_query<P>(
498 &self,
499 predicate: P,
500 ) -> Vec<Arc<Node>>
501 where
502 P: Fn(&Node) -> bool + Send + Sync,
503 {
504 let shards: Vec<_> = self.inner.nodes.iter().collect();
506
507 shards
509 .into_par_iter()
510 .flat_map(|shard| {
511 shard
513 .values()
514 .filter(|node| predicate(node))
515 .cloned()
516 .collect::<Vec<_>>()
517 })
518 .collect()
519 }
520
521 pub fn parallel_batch_query<P>(
532 &self,
533 batch_size: usize,
534 predicate: P,
535 ) -> Vec<Arc<Node>>
536 where
537 P: Fn(&[Arc<Node>]) -> Vec<Arc<Node>> + Send + Sync,
538 {
539 let shards: Vec<_> = self.inner.nodes.iter().collect();
541
542 shards
544 .into_par_iter()
545 .flat_map(|shard| {
546 let nodes: Vec<_> = shard.values().cloned().collect();
548
549 nodes
551 .chunks(batch_size)
552 .flat_map(&predicate)
553 .collect::<Vec<_>>()
554 })
555 .collect()
556 }
557
558 pub fn parallel_query_map<P, T, F>(
569 &self,
570 predicate: P,
571 transform: F,
572 ) -> Vec<T>
573 where
574 P: Fn(&Node) -> bool + Send + Sync,
575 F: Fn(&Arc<Node>) -> T + Send + Sync,
576 T: Send,
577 {
578 let shards: Vec<_> = self.inner.nodes.iter().collect();
580
581 shards
583 .into_par_iter()
584 .flat_map(|shard| {
585 shard
587 .values()
588 .filter(|node| predicate(node))
589 .map(&transform)
590 .collect::<Vec<T>>()
591 })
592 .collect()
593 }
594
595 pub fn parallel_query_reduce<P, T, F>(
607 &self,
608 predicate: P,
609 init: T,
610 fold: F,
611 ) -> T
612 where
613 P: Fn(&Node) -> bool + Send + Sync,
614 F: Fn(T, &Arc<Node>) -> T + Send + Sync,
615 T: Send + Sync + Clone,
616 {
617 let dummy_node = Arc::new(Node::new(
618 "",
619 "".to_string(),
620 Default::default(),
621 vec![],
622 vec![],
623 ));
624
625 let shards: Vec<_> = self.inner.nodes.iter().collect();
627
628 shards
630 .into_par_iter()
631 .map(|shard| {
632 shard
634 .values()
635 .filter(|node| predicate(node))
636 .fold(init.clone(), &fold)
637 })
638 .reduce(|| init.clone(), |a, _b| fold(a, &dummy_node))
640 }
641
642 fn get_all_nodes(&self) -> Vec<Arc<Node>> {
644 let mut result = Vec::new();
645 for shard in &self.inner.nodes {
646 for node in shard.values() {
647 result.push(node.clone());
648 }
649 }
650 result
651 }
652}
653
654pub struct QueryEngine<'a> {
656 pool: &'a NodePool,
657 conditions: Vec<NodeConditionRef<'a>>,
658}
659
660impl<'a> QueryEngine<'a> {
661 pub fn new(pool: &'a NodePool) -> Self {
663 Self { pool, conditions: Vec::new() }
664 }
665
666 pub fn by_type(
668 mut self,
669 node_type: &'a str,
670 ) -> Self {
671 let node_type = node_type.to_string();
672 self.conditions.push(Box::new(move |node| node.r#type == node_type));
673 self
674 }
675
676 pub fn by_attr(
678 mut self,
679 key: &'a str,
680 value: &'a serde_json::Value,
681 ) -> Self {
682 let key = key.to_string();
683 let value = value.clone();
684 self.conditions
685 .push(Box::new(move |node| node.attrs.get(&key) == Some(&value)));
686 self
687 }
688
689 pub fn by_mark(
691 mut self,
692 mark_type: &'a str,
693 ) -> Self {
694 let mark_type = mark_type.to_string();
695 self.conditions.push(Box::new(move |node| {
696 node.marks.iter().any(|mark| mark.r#type == mark_type)
697 }));
698 self
699 }
700
701 pub fn by_child_count(
703 mut self,
704 count: usize,
705 ) -> Self {
706 self.conditions.push(Box::new(move |node| node.content.len() == count));
707 self
708 }
709
710 pub fn by_depth(
712 mut self,
713 depth: usize,
714 ) -> Self {
715 let pool = self.pool.clone();
716 self.conditions.push(Box::new(move |node| {
717 pool.get_node_depth(&node.id) == Some(depth)
718 }));
719 self
720 }
721
722 pub fn by_ancestor_type(
724 mut self,
725 ancestor_type: &'a str,
726 ) -> Self {
727 let pool = self.pool.clone();
728 let ancestor_type = ancestor_type.to_string();
729 self.conditions.push(Box::new(move |node| {
730 pool.ancestors(&node.id)
731 .iter()
732 .any(|ancestor| ancestor.r#type == ancestor_type)
733 }));
734 self
735 }
736
737 pub fn by_descendant_type(
739 mut self,
740 descendant_type: &'a str,
741 ) -> Self {
742 let pool = self.pool.clone();
743 let descendant_type = descendant_type.to_string();
744 self.conditions.push(Box::new(move |node| {
745 pool.descendants(&node.id)
746 .iter()
747 .any(|descendant| descendant.r#type == descendant_type)
748 }));
749 self
750 }
751
752 pub fn find_all(&self) -> Vec<Arc<Node>> {
754 self.pool
755 .get_all_nodes()
756 .into_iter()
757 .filter(|node| {
758 self.conditions.iter().all(|condition| condition(node))
759 })
760 .collect()
761 }
762
763 pub fn find_first(&self) -> Option<Arc<Node>> {
765 self.pool.get_all_nodes().into_iter().find(|node| {
766 self.conditions.iter().all(|condition| condition(node))
767 })
768 }
769
770 pub fn count(&self) -> usize {
772 self.pool
773 .get_all_nodes()
774 .into_iter()
775 .filter(|node| {
776 self.conditions.iter().all(|condition| condition(node))
777 })
778 .count()
779 }
780
781 pub fn parallel_find_all(&self) -> Vec<Arc<Node>> {
783 let conditions = &self.conditions;
784 self.pool.parallel_query(|node| {
785 conditions.iter().all(|condition| condition(node))
786 })
787 }
788
789 pub fn parallel_find_first(&self) -> Option<Arc<Node>> {
791 let conditions = &self.conditions;
792 self.pool.get_all_nodes().into_par_iter().find_any(move |node| {
793 conditions.iter().all(|condition| condition(node))
794 })
795 }
796
797 pub fn parallel_count(&self) -> usize {
799 let conditions = &self.conditions;
800 self.pool
801 .get_all_nodes()
802 .into_par_iter()
803 .filter(move |node| {
804 conditions.iter().all(|condition| condition(node))
805 })
806 .count()
807 }
808}
809
810impl NodePool {
811 pub fn query(&self) -> QueryEngine {
813 QueryEngine::new(self)
814 }
815}
816
817#[derive(Clone, Debug)]
819pub struct QueryCacheConfig {
820 pub capacity: usize,
822 pub enabled: bool,
824}
825
826impl Default for QueryCacheConfig {
827 fn default() -> Self {
828 Self { capacity: 1000, enabled: true }
829 }
830}
831
832pub struct OptimizedQueryEngine {
834 pool: Arc<NodePool>,
835 cache: Option<LruCache<String, Vec<Arc<Node>>>>,
836 type_index: HashMap<String, Vec<Arc<Node>>>,
837 depth_index: HashMap<usize, Vec<Arc<Node>>>,
838 mark_index: HashMap<String, Vec<Arc<Node>>>,
839}
840
841impl OptimizedQueryEngine {
842 pub fn new(
843 pool: &NodePool,
844 config: QueryCacheConfig,
845 ) -> PoolResult<Self> {
846 let mut engine = Self {
847 pool: Arc::new(pool.clone()),
848 cache: if config.enabled {
849 Some(LruCache::new(
850 NonZeroUsize::new(config.capacity).ok_or_else(|| {
851 anyhow::anyhow!("query cache capacity must be > 0")
852 })?,
853 ))
854 } else {
855 None
856 },
857 type_index: HashMap::new(),
858 depth_index: HashMap::new(),
859 mark_index: HashMap::new(),
860 };
861 let start = Instant::now();
862 engine.build_indices()?;
863 let duration = start.elapsed();
864 println!("索引构建完成,耗时: {duration:?}");
865 Ok(engine)
866 }
867
868 fn build_indices(&mut self) -> PoolResult<()> {
870 use rayon::prelude::*;
871 use std::collections::HashMap;
872 use std::sync::Mutex;
873
874 use std::sync::Arc;
875 let node_count = self.pool.size();
877
878 let type_index =
880 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 5)));
881 let depth_index = Arc::new(Mutex::new(HashMap::with_capacity(10)));
882 let mark_index =
883 Arc::new(Mutex::new(HashMap::with_capacity(node_count / 10)));
884
885 let optimal_shard_size = 1000; let mut all_nodes: Vec<_> = self
890 .pool
891 .inner
892 .nodes
893 .iter()
894 .flat_map(|shard| shard.values().cloned())
895 .collect();
896
897 all_nodes.sort_by(|a, b| a.id.cmp(&b.id));
899
900 let shards: Vec<_> = all_nodes.chunks(optimal_shard_size).collect();
902
903 shards.into_par_iter().for_each(|shard| {
905 let mut local_type_index = HashMap::with_capacity(shard.len() / 5);
907 let mut local_depth_index = HashMap::with_capacity(5);
908 let mut local_mark_index = HashMap::with_capacity(shard.len() / 10);
909
910 let mut type_nodes = Vec::with_capacity(shard.len());
912 let mut depth_nodes = Vec::with_capacity(shard.len());
913 let mut mark_nodes = Vec::with_capacity(shard.len() * 2);
914
915 for node in shard {
917 type_nodes.push((node.r#type.clone(), Arc::clone(node)));
919
920 if let Some(depth) = self.pool.get_node_depth(&node.id) {
922 depth_nodes.push((depth, Arc::clone(node)));
923 }
924
925 for mark in &node.marks {
927 mark_nodes.push((mark.r#type.clone(), Arc::clone(node)));
928 }
929 }
930
931 for (type_name, node) in type_nodes {
933 local_type_index
934 .entry(type_name)
935 .or_insert_with(|| Vec::with_capacity(shard.len() / 5))
936 .push(node);
937 }
938
939 for (depth, node) in depth_nodes {
940 local_depth_index
941 .entry(depth)
942 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
943 .push(node);
944 }
945
946 for (mark_type, node) in mark_nodes {
947 local_mark_index
948 .entry(mark_type)
949 .or_insert_with(|| Vec::with_capacity(shard.len() / 10))
950 .push(node);
951 }
952
953 {
955 if let Ok(mut type_idx) = type_index.lock() {
956 for (k, v) in local_type_index {
957 type_idx
958 .entry(k)
959 .or_insert_with(|| Vec::with_capacity(v.len()))
960 .extend(v);
961 }
962 } else {
963 return;
964 }
965 }
966 {
967 if let Ok(mut depth_idx) = depth_index.lock() {
968 for (k, v) in local_depth_index {
969 depth_idx
970 .entry(k)
971 .or_insert_with(|| Vec::with_capacity(v.len()))
972 .extend(v);
973 }
974 } else {
975 return;
976 }
977 }
978 {
979 if let Ok(mut mark_idx) = mark_index.lock() {
980 for (k, v) in local_mark_index {
981 mark_idx
982 .entry(k)
983 .or_insert_with(|| Vec::with_capacity(v.len()))
984 .extend(v);
985 }
986 }
987 }
988 });
989
990 self.type_index = Arc::try_unwrap(type_index)
992 .map_err(|_| anyhow::anyhow!("type_index still has refs"))?
993 .into_inner()
994 .map_err(|_| anyhow::anyhow!("type_index poisoned"))?;
995 self.depth_index = Arc::try_unwrap(depth_index)
996 .map_err(|_| anyhow::anyhow!("depth_index still has refs"))?
997 .into_inner()
998 .map_err(|_| anyhow::anyhow!("depth_index poisoned"))?;
999 self.mark_index = Arc::try_unwrap(mark_index)
1000 .map_err(|_| anyhow::anyhow!("mark_index still has refs"))?
1001 .into_inner()
1002 .map_err(|_| anyhow::anyhow!("mark_index poisoned"))?;
1003 Ok(())
1004 }
1005
1006 pub fn by_type(
1008 &self,
1009 node_type: &str,
1010 ) -> Vec<Arc<Node>> {
1011 self.type_index.get(node_type).cloned().unwrap_or_default()
1012 }
1013
1014 pub fn by_depth(
1016 &self,
1017 depth: usize,
1018 ) -> Vec<Arc<Node>> {
1019 self.depth_index.get(&depth).cloned().unwrap_or_default()
1020 }
1021
1022 pub fn by_mark(
1024 &self,
1025 mark_type: &str,
1026 ) -> Vec<Arc<Node>> {
1027 self.mark_index.get(mark_type).cloned().unwrap_or_default()
1028 }
1029
1030 pub fn query(
1032 &mut self,
1033 conditions: Vec<NodeCondition>,
1034 ) -> Vec<Arc<Node>> {
1035 let cache_key = self.generate_query_cache_key(&conditions);
1037
1038 if let Some(cache) = &self.cache {
1040 if let Some(cached) = cache.peek(&cache_key) {
1041 return cached.clone();
1042 }
1043 }
1044
1045 let mut candidates: Option<Vec<Arc<Node>>> = None;
1047
1048 for condition in &conditions {
1050 if let Some(indexed) = self.get_indexed_nodes(condition) {
1051 candidates = match candidates {
1052 None => Some(indexed),
1053 Some(existing) => {
1054 Some(self.intersect_nodes(&existing, &indexed))
1055 },
1056 };
1057 }
1058 }
1059
1060 let result: Vec<Arc<Node>> = match candidates {
1061 Some(nodes) => {
1062 nodes
1064 .par_iter()
1065 .filter(|node| {
1066 conditions.iter().all(|condition| condition(node))
1067 })
1068 .cloned()
1069 .collect()
1070 },
1071 None => {
1072 self.pool
1074 .parallel_query(|node| {
1075 conditions.iter().all(|condition| condition(node))
1076 })
1077 .into_iter()
1078 .collect()
1079 },
1080 };
1081
1082 if let Some(cache) = &mut self.cache {
1084 cache.put(cache_key, result.clone());
1085 }
1086
1087 result
1088 }
1089
1090 fn generate_query_cache_key(
1092 &self,
1093 conditions: &[NodeCondition],
1094 ) -> String {
1095 use std::collections::hash_map::DefaultHasher;
1096 use std::hash::{Hash, Hasher};
1097
1098 let mut hasher = DefaultHasher::new();
1099
1100 conditions.len().hash(&mut hasher);
1102 self.pool.key().hash(&mut hasher);
1103
1104 for (i, _condition) in conditions.iter().enumerate() {
1106 i.hash(&mut hasher);
1108 std::ptr::addr_of!(_condition).hash(&mut hasher);
1109 }
1110
1111 format!("query_{:x}", hasher.finish())
1112 }
1113
1114 fn get_indexed_nodes(
1116 &self,
1117 condition: &(dyn Fn(&Node) -> bool + Send + Sync),
1118 ) -> Option<Vec<Arc<Node>>> {
1119 if let Some(type_nodes) = self.type_index.get("document") {
1121 if condition(&type_nodes[0]) {
1122 return Some(type_nodes.clone());
1123 }
1124 }
1125
1126 if let Some(depth_nodes) = self.depth_index.get(&0) {
1128 if condition(&depth_nodes[0]) {
1129 return Some(depth_nodes.clone());
1130 }
1131 }
1132
1133 for mark_nodes in self.mark_index.values() {
1135 if !mark_nodes.is_empty() && condition(&mark_nodes[0]) {
1136 return Some(mark_nodes.clone());
1137 }
1138 }
1139
1140 None
1141 }
1142
1143 fn intersect_nodes(
1145 &self,
1146 nodes1: &[Arc<Node>],
1147 nodes2: &[Arc<Node>],
1148 ) -> Vec<Arc<Node>> {
1149 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
1150 nodes2
1151 .iter()
1152 .filter(|node| set1.contains(node.id.as_ref()))
1153 .cloned()
1154 .collect()
1155 }
1156}
1157
1158impl NodePool {
1159 pub fn optimized_query(
1161 &self,
1162 config: QueryCacheConfig,
1163 ) -> PoolResult<OptimizedQueryEngine> {
1164 OptimizedQueryEngine::new(self, config)
1165 }
1166}
1167
1168impl Clone for OptimizedQueryEngine {
1170 fn clone(&self) -> Self {
1171 Self {
1172 pool: self.pool.clone(),
1173 cache: self.cache.clone(),
1174 type_index: self.type_index.clone(),
1175 depth_index: self.depth_index.clone(),
1176 mark_index: self.mark_index.clone(),
1177 }
1178 }
1179}
1180
1181#[derive(Clone, Debug)]
1183pub struct LazyQueryConfig {
1184 pub cache_capacity: usize,
1186 pub index_cache_capacity: usize,
1188 pub cache_enabled: bool,
1190 pub index_build_threshold: usize,
1192}
1193
1194impl Default for LazyQueryConfig {
1195 fn default() -> Self {
1196 Self {
1197 cache_capacity: 1000,
1198 index_cache_capacity: 100,
1199 cache_enabled: true,
1200 index_build_threshold: 5,
1201 }
1202 }
1203}
1204
1205#[derive(Debug, Clone)]
1207pub struct QueryStats {
1208 count: usize,
1210 last_query: Instant,
1212}
1213
1214pub struct LazyQueryEngine {
1216 pool: Arc<NodePool>,
1217
1218 query_cache: Option<LruCache<String, Vec<Arc<Node>>>>,
1220
1221 type_index_cache: LruCache<String, Vec<Arc<Node>>>,
1223 depth_index_cache: LruCache<usize, Vec<Arc<Node>>>,
1224 mark_index_cache: LruCache<String, Vec<Arc<Node>>>,
1225
1226 type_query_stats: HashMap<String, QueryStats>,
1228 depth_query_stats: HashMap<usize, QueryStats>,
1229 mark_query_stats: HashMap<String, QueryStats>,
1230
1231 config: LazyQueryConfig,
1233}
1234
1235impl LazyQueryEngine {
1238 pub fn new(
1239 pool: &NodePool,
1240 config: LazyQueryConfig,
1241 ) -> Self {
1242 Self {
1243 pool: Arc::new(pool.clone()),
1244 query_cache: if config.cache_enabled {
1245 Some(LruCache::new(
1246 NonZeroUsize::new(config.cache_capacity)
1247 .expect("cache_capacity > 0"),
1248 ))
1249 } else {
1250 None
1251 },
1252 type_index_cache: LruCache::new(
1253 NonZeroUsize::new(config.index_cache_capacity)
1254 .expect("index_cache_capacity > 0"),
1255 ),
1256 depth_index_cache: LruCache::new(
1257 NonZeroUsize::new(config.index_cache_capacity)
1258 .expect("index_cache_capacity > 0"),
1259 ),
1260 mark_index_cache: LruCache::new(
1261 NonZeroUsize::new(config.index_cache_capacity)
1262 .expect("index_cache_capacity > 0"),
1263 ),
1264 type_query_stats: HashMap::new(),
1265 depth_query_stats: HashMap::new(),
1266 mark_query_stats: HashMap::new(),
1267 config,
1268 }
1269 }
1270
1271 pub fn by_type_lazy(
1273 &mut self,
1274 node_type: &str,
1275 ) -> Vec<Arc<Node>> {
1276 self.update_type_stats(node_type);
1278
1279 if let Some(cached) = self.type_index_cache.get(node_type) {
1281 return cached.clone();
1282 }
1283
1284 let start = Instant::now();
1286 let nodes = self.build_type_index(node_type);
1287 let duration = start.elapsed();
1288
1289 println!(
1290 "实时构建类型索引 '{}', 耗时: {:?}, 节点数: {}",
1291 node_type,
1292 duration,
1293 nodes.len()
1294 );
1295
1296 self.type_index_cache.put(node_type.to_string(), nodes.clone());
1298
1299 nodes
1300 }
1301
1302 pub fn by_depth_lazy(
1304 &mut self,
1305 depth: usize,
1306 ) -> Vec<Arc<Node>> {
1307 self.update_depth_stats(depth);
1308
1309 if let Some(cached) = self.depth_index_cache.get(&depth) {
1310 return cached.clone();
1311 }
1312
1313 let start = Instant::now();
1314 let nodes = self.build_depth_index(depth);
1315 let duration = start.elapsed();
1316
1317 println!(
1318 "实时构建深度索引 {}, 耗时: {:?}, 节点数: {}",
1319 depth,
1320 duration,
1321 nodes.len()
1322 );
1323
1324 self.depth_index_cache.put(depth, nodes.clone());
1325 nodes
1326 }
1327
1328 pub fn by_mark_lazy(
1330 &mut self,
1331 mark_type: &str,
1332 ) -> Vec<Arc<Node>> {
1333 self.update_mark_stats(mark_type);
1334
1335 if let Some(cached) = self.mark_index_cache.get(mark_type) {
1336 return cached.clone();
1337 }
1338
1339 let start = Instant::now();
1340 let nodes = self.build_mark_index(mark_type);
1341 let duration = start.elapsed();
1342
1343 println!(
1344 "实时构建标记索引 '{}', 耗时: {:?}, 节点数: {}",
1345 mark_type,
1346 duration,
1347 nodes.len()
1348 );
1349
1350 self.mark_index_cache.put(mark_type.to_string(), nodes.clone());
1351 nodes
1352 }
1353
1354 pub fn smart_query<F>(
1356 &mut self,
1357 query_name: &str,
1358 query_fn: F,
1359 ) -> Vec<Arc<Node>>
1360 where
1361 F: Fn() -> Vec<Arc<Node>>,
1362 {
1363 let cache_key = self.generate_cache_key(query_name);
1365
1366 if let Some(cache) = &self.query_cache {
1368 if let Some(cached) = cache.peek(&cache_key) {
1369 return cached.clone();
1370 }
1371 }
1372
1373 let start = Instant::now();
1375 let result = query_fn();
1376 let duration = start.elapsed();
1377
1378 println!(
1379 "执行查询 '{}', 耗时: {:?}, 结果数: {}",
1380 query_name,
1381 duration,
1382 result.len()
1383 );
1384
1385 if let Some(cache) = &mut self.query_cache {
1387 cache.put(cache_key, result.clone());
1388 }
1389
1390 result
1391 }
1392
1393 pub fn combined_query(
1395 &mut self,
1396 conditions: &[QueryCondition],
1397 ) -> Vec<Arc<Node>> {
1398 let cache_key = self.generate_combined_cache_key(conditions);
1399
1400 if let Some(cache) = &self.query_cache {
1402 if let Some(cached) = cache.peek(&cache_key) {
1403 return cached.clone();
1404 }
1405 }
1406
1407 let mut candidates: Option<Vec<Arc<Node>>> = None;
1408
1409 for condition in conditions {
1411 let indexed_nodes = match condition {
1412 QueryCondition::ByType(type_name) => {
1413 if self.should_use_type_index(type_name) {
1414 Some(self.by_type_lazy(type_name))
1415 } else {
1416 None
1417 }
1418 },
1419 QueryCondition::ByDepth(depth) => {
1420 if self.should_use_depth_index(*depth) {
1421 Some(self.by_depth_lazy(*depth))
1422 } else {
1423 None
1424 }
1425 },
1426 QueryCondition::ByMark(mark_type) => {
1427 if self.should_use_mark_index(mark_type) {
1428 Some(self.by_mark_lazy(mark_type))
1429 } else {
1430 None
1431 }
1432 },
1433 QueryCondition::ByAttr { .. }
1434 | QueryCondition::IsLeaf
1435 | QueryCondition::HasChildren => None,
1436 };
1437
1438 if let Some(indexed) = indexed_nodes {
1439 candidates = match candidates {
1440 None => Some(indexed),
1441 Some(existing) => {
1442 Some(self.intersect_nodes(&existing, &indexed))
1443 },
1444 };
1445 }
1446 }
1447
1448 let result = match candidates {
1450 Some(nodes) => nodes
1451 .into_par_iter()
1452 .filter(|node| conditions.iter().all(|cond| cond.matches(node)))
1453 .collect(),
1454 None => {
1455 self.pool.parallel_query(|node| {
1457 conditions.iter().all(|cond| cond.matches(node))
1458 })
1459 },
1460 };
1461
1462 if let Some(cache) = &mut self.query_cache {
1464 cache.put(cache_key, result.clone());
1465 }
1466
1467 result
1468 }
1469
1470 fn update_type_stats(
1473 &mut self,
1474 type_name: &str,
1475 ) {
1476 let stats = self
1477 .type_query_stats
1478 .entry(type_name.to_string())
1479 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1480 stats.count += 1;
1481 stats.last_query = Instant::now();
1482 }
1483
1484 fn update_depth_stats(
1485 &mut self,
1486 depth: usize,
1487 ) {
1488 let stats = self
1489 .depth_query_stats
1490 .entry(depth)
1491 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1492 stats.count += 1;
1493 stats.last_query = Instant::now();
1494 }
1495
1496 fn update_mark_stats(
1497 &mut self,
1498 mark_type: &str,
1499 ) {
1500 let stats = self
1501 .mark_query_stats
1502 .entry(mark_type.to_string())
1503 .or_insert(QueryStats { count: 0, last_query: Instant::now() });
1504 stats.count += 1;
1505 stats.last_query = Instant::now();
1506 }
1507
1508 fn should_use_type_index(
1509 &self,
1510 type_name: &str,
1511 ) -> bool {
1512 self.type_query_stats
1513 .get(type_name)
1514 .map(|stats| stats.count >= self.config.index_build_threshold)
1515 .unwrap_or(false)
1516 }
1517
1518 fn should_use_depth_index(
1519 &self,
1520 depth: usize,
1521 ) -> bool {
1522 self.depth_query_stats
1523 .get(&depth)
1524 .map(|stats| stats.count >= self.config.index_build_threshold)
1525 .unwrap_or(false)
1526 }
1527
1528 fn should_use_mark_index(
1529 &self,
1530 mark_type: &str,
1531 ) -> bool {
1532 self.mark_query_stats
1533 .get(mark_type)
1534 .map(|stats| stats.count >= self.config.index_build_threshold)
1535 .unwrap_or(false)
1536 }
1537
1538 fn build_type_index(
1539 &self,
1540 node_type: &str,
1541 ) -> Vec<Arc<Node>> {
1542 self.pool.parallel_query(|node| node.r#type == node_type)
1543 }
1544
1545 fn build_depth_index(
1546 &self,
1547 target_depth: usize,
1548 ) -> Vec<Arc<Node>> {
1549 self.pool.parallel_query(|node| {
1550 self.pool
1551 .get_node_depth(&node.id)
1552 .map(|depth| depth == target_depth)
1553 .unwrap_or(false)
1554 })
1555 }
1556
1557 fn build_mark_index(
1558 &self,
1559 mark_type: &str,
1560 ) -> Vec<Arc<Node>> {
1561 self.pool.parallel_query(|node| {
1562 node.marks.iter().any(|mark| mark.r#type == mark_type)
1563 })
1564 }
1565
1566 fn generate_cache_key(
1567 &self,
1568 query_name: &str,
1569 ) -> String {
1570 use std::collections::hash_map::DefaultHasher;
1571 use std::hash::{Hash, Hasher};
1572
1573 let mut hasher = DefaultHasher::new();
1574 query_name.hash(&mut hasher);
1575 self.pool.key().hash(&mut hasher);
1576 format!("query_{:x}", hasher.finish())
1577 }
1578
1579 fn generate_combined_cache_key(
1580 &self,
1581 conditions: &[QueryCondition],
1582 ) -> String {
1583 use std::collections::hash_map::DefaultHasher;
1584 use std::hash::{Hash, Hasher};
1585
1586 let mut hasher = DefaultHasher::new();
1587 for condition in conditions {
1588 condition.cache_key().hash(&mut hasher);
1589 }
1590 self.pool.key().hash(&mut hasher);
1591 format!("combined_{:x}", hasher.finish())
1592 }
1593
1594 fn intersect_nodes(
1595 &self,
1596 nodes1: &[Arc<Node>],
1597 nodes2: &[Arc<Node>],
1598 ) -> Vec<Arc<Node>> {
1599 let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
1600 nodes2
1601 .iter()
1602 .filter(|node| set1.contains(node.id.as_ref()))
1603 .cloned()
1604 .collect()
1605 }
1606
1607 pub fn get_query_stats(&self) -> QueryStatsSummary {
1609 QueryStatsSummary {
1610 type_queries: self.type_query_stats.clone(),
1611 depth_queries: self.depth_query_stats.clone(),
1612 mark_queries: self.mark_query_stats.clone(),
1613 cache_hit_rates: self.calculate_cache_hit_rates(),
1614 }
1615 }
1616
1617 fn calculate_cache_hit_rates(&self) -> CacheHitRates {
1618 CacheHitRates {
1619 query_cache_size: self
1620 .query_cache
1621 .as_ref()
1622 .map(|c| c.len())
1623 .unwrap_or(0),
1624 type_index_cache_size: self.type_index_cache.len(),
1625 depth_index_cache_size: self.depth_index_cache.len(),
1626 mark_index_cache_size: self.mark_index_cache.len(),
1627 }
1628 }
1629}
1630
1631#[derive(Debug, Clone)]
1633pub enum QueryCondition {
1634 ByType(String),
1635 ByDepth(usize),
1636 ByMark(String),
1637 ByAttr { key: String, value: serde_json::Value },
1638 IsLeaf,
1639 HasChildren,
1640}
1641
1642impl QueryCondition {
1643 pub fn matches(
1644 &self,
1645 node: &Node,
1646 ) -> bool {
1647 match self {
1648 QueryCondition::ByType(type_name) => node.r#type == *type_name,
1649 QueryCondition::ByDepth(_) => true, QueryCondition::ByMark(mark_type) => {
1651 node.marks.iter().any(|mark| mark.r#type == *mark_type)
1652 },
1653 QueryCondition::ByAttr { key, value } => {
1654 node.attrs.get(key) == Some(value)
1655 },
1656 QueryCondition::IsLeaf => node.content.is_empty(),
1657 QueryCondition::HasChildren => !node.content.is_empty(),
1658 }
1659 }
1660
1661 pub fn cache_key(&self) -> String {
1662 match self {
1663 QueryCondition::ByType(t) => format!("type_{t}"),
1664 QueryCondition::ByDepth(d) => format!("depth_{d}"),
1665 QueryCondition::ByMark(m) => format!("mark_{m}"),
1666 QueryCondition::ByAttr { key, value } => {
1667 format!(
1668 "attr_{}_{}",
1669 key,
1670 serde_json::to_string(value).unwrap_or_default()
1671 )
1672 },
1673 QueryCondition::IsLeaf => "is_leaf".to_string(),
1674 QueryCondition::HasChildren => "has_children".to_string(),
1675 }
1676 }
1677}
1678
1679#[derive(Debug)]
1681pub struct QueryStatsSummary {
1682 pub type_queries: HashMap<String, QueryStats>,
1683 pub depth_queries: HashMap<usize, QueryStats>,
1684 pub mark_queries: HashMap<String, QueryStats>,
1685 pub cache_hit_rates: CacheHitRates,
1686}
1687
1688#[derive(Debug)]
1690pub struct CacheHitRates {
1691 pub query_cache_size: usize,
1692 pub type_index_cache_size: usize,
1693 pub depth_index_cache_size: usize,
1694 pub mark_index_cache_size: usize,
1695}
1696
1697impl NodePool {
1698 pub fn lazy_query(
1700 &self,
1701 config: LazyQueryConfig,
1702 ) -> LazyQueryEngine {
1703 LazyQueryEngine::new(self, config)
1705 }
1706}
1707
1708