use crate::error::PoolResult;
use crate::{node_definition::NodeTree, tree::Tree};
use super::{error::error_helpers, node::Node, types::NodeId};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use std::{sync::Arc};
use rayon::prelude::*;
use std::marker::Sync;
use std::collections::{HashMap, HashSet};
use lru::LruCache;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
static POOL_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
type NodeCondition = Box<dyn Fn(&Node) -> bool + Send + Sync>;
type NodeConditionRef<'a> = Box<dyn Fn(&Node) -> bool + Send + Sync + 'a>;
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct NodePool {
inner: Arc<Tree>,
key: String,
}
impl NodePool {
pub fn new(inner: Arc<Tree>) -> Arc<NodePool> {
let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
let pool = Self { inner, key: format!("pool_{id}") };
let pool: Arc<NodePool> = Arc::new(pool);
pool
}
pub fn key(&self) -> &str {
&self.key
}
pub fn size(&self) -> usize {
self.inner.nodes.iter().map(|i| i.values().len()).sum()
}
pub fn root(&self) -> Arc<Node> {
self.inner[&self.inner.root_id].clone()
}
pub fn root_id(&self) -> &NodeId {
&self.inner.root_id
}
pub fn get_inner(&self) -> &Arc<Tree> {
&self.inner
}
pub fn from(nodes: NodeTree) -> Arc<NodePool> {
let id = POOL_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
let pool = Self {
inner: Arc::new(Tree::from(nodes)),
key: format!("pool_{id}"),
};
let pool: Arc<NodePool> = Arc::new(pool);
pool
}
pub fn get_node(
&self,
id: &NodeId,
) -> Option<Arc<Node>> {
self.inner.get_node(id)
}
pub fn get_parent_node(
&self,
id: &NodeId,
) -> Option<Arc<Node>> {
self.inner.get_parent_node(id)
}
pub fn contains_node(
&self,
id: &NodeId,
) -> bool {
self.inner.contains_node(id)
}
pub fn children(
&self,
parent_id: &NodeId,
) -> Option<imbl::Vector<NodeId>> {
self.get_node(parent_id).map(|n| n.content.clone())
}
pub fn descendants(
&self,
parent_id: &NodeId,
) -> Vec<Arc<Node>> {
let mut result: Vec<Arc<Node>> = Vec::new();
self._collect_descendants(parent_id, &mut result);
result
}
fn _collect_descendants(
&self,
parent_id: &NodeId,
result: &mut Vec<Arc<Node>>,
) {
if let Some(children) = self.children(parent_id) {
for child_id in &children {
if let Some(child) = self.get_node(child_id) {
result.push(child);
self._collect_descendants(child_id, result);
}
}
}
}
pub fn for_each<F>(
&self,
id: &NodeId,
f: F,
) where
F: Fn(&Arc<Node>),
{
if let Some(children) = self.children(id) {
for child_id in &children {
if let Some(child) = self.get_node(child_id) {
f(&child);
}
}
}
}
pub fn parent_id(
&self,
child_id: &NodeId,
) -> Option<&NodeId> {
self.inner.parent_map.get(child_id)
}
pub fn ancestors(
&self,
child_id: &NodeId,
) -> Vec<Arc<Node>> {
let mut chain = Vec::new();
let mut current_id = child_id;
while let Some(parent_id) = self.parent_id(current_id) {
if let Some(parent) = self.get_node(parent_id) {
chain.push(parent);
current_id = parent_id;
} else {
break;
}
}
chain
}
pub fn validate_hierarchy(&self) -> PoolResult<()> {
for (child_id, parent_id) in &self.inner.parent_map {
if !self.contains_node(parent_id) {
return Err(error_helpers::orphan_node(child_id.clone()));
}
if let Some(children) = self.children(parent_id) {
if !children.contains(child_id) {
return Err(error_helpers::invalid_parenting(
child_id.clone(),
parent_id.clone(),
));
}
}
}
Ok(())
}
pub fn filter_nodes<P>(
&self,
predicate: P,
) -> Vec<Arc<Node>>
where
P: Fn(&Node) -> bool,
{
self.get_all_nodes().into_iter().filter(|n| predicate(n)).collect()
}
pub fn find_node<P>(
&self,
predicate: P,
) -> Option<Arc<Node>>
where
P: Fn(&Node) -> bool,
{
self.get_all_nodes().into_iter().find(|n| predicate(n))
}
pub fn get_node_depth(
&self,
node_id: &NodeId,
) -> Option<usize> {
let mut depth = 0;
let mut current_id = node_id;
while let Some(parent_id) = self.parent_id(current_id) {
depth += 1;
current_id = parent_id;
}
Some(depth)
}
pub fn get_node_path(
&self,
node_id: &NodeId,
) -> Vec<NodeId> {
let mut path = Vec::new();
let mut current_id = node_id;
while let Some(parent_id) = self.parent_id(current_id) {
path.push(current_id.clone());
current_id = parent_id;
}
path.push(current_id.clone());
path.reverse();
path
}
pub fn resolve(
&self,
node_id: &NodeId,
) -> Vec<Arc<Node>> {
let mut result = Vec::new();
let mut current_id = node_id;
loop {
if let Some(node) = self.get_node(current_id) {
result.push(node);
}
if let Some(parent_id) = self.parent_id(current_id) {
current_id = parent_id;
} else {
break;
}
}
result.reverse();
result
}
pub fn is_leaf(
&self,
node_id: &NodeId,
) -> bool {
if let Some(children) = self.children(node_id) {
children.is_empty()
} else {
true
}
}
pub fn get_left_siblings(
&self,
node_id: &NodeId,
) -> Vec<NodeId> {
if let Some(parent_id) = self.parent_id(node_id) {
if let Some(siblings) = self.children(parent_id) {
if let Some(index) =
siblings.iter().position(|id| id == node_id)
{
return siblings.iter().take(index).cloned().collect();
} else {
eprintln!(
"Warning: Node {node_id:?} not found in parent's children list"
);
}
}
}
Vec::new()
}
pub fn get_right_siblings(
&self,
node_id: &NodeId,
) -> Vec<NodeId> {
if let Some(parent_id) = self.parent_id(node_id) {
if let Some(siblings) = self.children(parent_id) {
if let Some(index) =
siblings.iter().position(|id| id == node_id)
{
return siblings.iter().skip(index + 1).cloned().collect();
} else {
eprintln!(
"Warning: Node {node_id:?} not found in parent's children list"
);
}
}
}
Vec::new()
}
pub fn get_left_nodes(
&self,
node_id: &NodeId,
) -> Vec<Arc<Node>> {
let siblings = self.get_left_siblings(node_id);
let mut result = Vec::new();
for sibling_id in siblings {
if let Some(node) = self.get_node(&sibling_id) {
result.push(node);
}
}
result
}
pub fn get_right_nodes(
&self,
node_id: &NodeId,
) -> Vec<Arc<Node>> {
let siblings = self.get_right_siblings(node_id);
let mut result = Vec::new();
for sibling_id in siblings {
if let Some(node) = self.get_node(&sibling_id) {
result.push(node);
}
}
result
}
pub fn get_all_siblings(
&self,
node_id: &NodeId,
) -> Vec<NodeId> {
if let Some(parent_id) = self.parent_id(node_id) {
if let Some(children) = self.children(parent_id) {
return children.iter().cloned().collect();
}
}
Vec::new()
}
pub fn get_subtree_size(
&self,
node_id: &NodeId,
) -> usize {
let mut size = 1; if let Some(children) = self.children(node_id) {
for child_id in &children {
size += self.get_subtree_size(child_id);
}
}
size
}
pub fn is_ancestor(
&self,
ancestor_id: &NodeId,
descendant_id: &NodeId,
) -> bool {
let mut current_id = descendant_id;
while let Some(parent_id) = self.parent_id(current_id) {
if parent_id == ancestor_id {
return true;
}
current_id = parent_id;
}
false
}
pub fn get_lowest_common_ancestor(
&self,
node1_id: &NodeId,
node2_id: &NodeId,
) -> Option<NodeId> {
let path1 = self.get_node_path(node1_id);
let path2 = self.get_node_path(node2_id);
for ancestor_id in path1.iter().rev() {
if path2.contains(ancestor_id) {
return Some(ancestor_id.clone());
}
}
None
}
pub fn parallel_query<P>(
&self,
predicate: P,
) -> Vec<Arc<Node>>
where
P: Fn(&Node) -> bool + Send + Sync,
{
let shards: Vec<_> = self.inner.nodes.iter().collect();
shards
.into_par_iter()
.flat_map(|shard| {
shard
.values()
.filter(|node| predicate(node))
.cloned()
.collect::<Vec<_>>()
})
.collect()
}
pub fn parallel_batch_query<P>(
&self,
batch_size: usize,
predicate: P,
) -> Vec<Arc<Node>>
where
P: Fn(&[Arc<Node>]) -> Vec<Arc<Node>> + Send + Sync,
{
let shards: Vec<_> = self.inner.nodes.iter().collect();
shards
.into_par_iter()
.flat_map(|shard| {
let nodes: Vec<_> = shard.values().cloned().collect();
nodes
.chunks(batch_size)
.flat_map(&predicate)
.collect::<Vec<_>>()
})
.collect()
}
pub fn parallel_query_map<P, T, F>(
&self,
predicate: P,
transform: F,
) -> Vec<T>
where
P: Fn(&Node) -> bool + Send + Sync,
F: Fn(&Arc<Node>) -> T + Send + Sync,
T: Send,
{
let shards: Vec<_> = self.inner.nodes.iter().collect();
shards
.into_par_iter()
.flat_map(|shard| {
shard
.values()
.filter(|node| predicate(node))
.map(&transform)
.collect::<Vec<T>>()
})
.collect()
}
pub fn parallel_query_reduce<P, T, F>(
&self,
predicate: P,
init: T,
fold: F,
) -> T
where
P: Fn(&Node) -> bool + Send + Sync,
F: Fn(T, &Arc<Node>) -> T + Send + Sync,
T: Send + Sync + Clone,
{
let dummy_node = Arc::new(Node::new(
"",
"".to_string(),
Default::default(),
vec![],
vec![],
));
let shards: Vec<_> = self.inner.nodes.iter().collect();
shards
.into_par_iter()
.map(|shard| {
shard
.values()
.filter(|node| predicate(node))
.fold(init.clone(), &fold)
})
.reduce(|| init.clone(), |a, _b| fold(a, &dummy_node))
}
fn get_all_nodes(&self) -> Vec<Arc<Node>> {
let mut result = Vec::new();
for shard in &self.inner.nodes {
for node in shard.values() {
result.push(node.clone());
}
}
result
}
}
pub struct QueryEngine<'a> {
pool: &'a NodePool,
conditions: Vec<NodeConditionRef<'a>>,
}
impl<'a> QueryEngine<'a> {
pub fn new(pool: &'a NodePool) -> Self {
Self { pool, conditions: Vec::new() }
}
pub fn by_type(
mut self,
node_type: &'a str,
) -> Self {
let node_type = node_type.to_string();
self.conditions.push(Box::new(move |node| node.r#type == node_type));
self
}
pub fn by_attr(
mut self,
key: &'a str,
value: &'a serde_json::Value,
) -> Self {
let key = key.to_string();
let value = value.clone();
self.conditions
.push(Box::new(move |node| node.attrs.get(&key) == Some(&value)));
self
}
pub fn by_mark(
mut self,
mark_type: &'a str,
) -> Self {
let mark_type = mark_type.to_string();
self.conditions.push(Box::new(move |node| {
node.marks.iter().any(|mark| mark.r#type == mark_type)
}));
self
}
pub fn by_child_count(
mut self,
count: usize,
) -> Self {
self.conditions.push(Box::new(move |node| node.content.len() == count));
self
}
pub fn by_depth(
mut self,
depth: usize,
) -> Self {
let pool = self.pool.clone();
self.conditions.push(Box::new(move |node| {
pool.get_node_depth(&node.id) == Some(depth)
}));
self
}
pub fn by_ancestor_type(
mut self,
ancestor_type: &'a str,
) -> Self {
let pool = self.pool.clone();
let ancestor_type = ancestor_type.to_string();
self.conditions.push(Box::new(move |node| {
pool.ancestors(&node.id)
.iter()
.any(|ancestor| ancestor.r#type == ancestor_type)
}));
self
}
pub fn by_descendant_type(
mut self,
descendant_type: &'a str,
) -> Self {
let pool = self.pool.clone();
let descendant_type = descendant_type.to_string();
self.conditions.push(Box::new(move |node| {
pool.descendants(&node.id)
.iter()
.any(|descendant| descendant.r#type == descendant_type)
}));
self
}
pub fn find_all(&self) -> Vec<Arc<Node>> {
self.pool
.get_all_nodes()
.into_iter()
.filter(|node| {
self.conditions.iter().all(|condition| condition(node))
})
.collect()
}
pub fn find_first(&self) -> Option<Arc<Node>> {
self.pool.get_all_nodes().into_iter().find(|node| {
self.conditions.iter().all(|condition| condition(node))
})
}
pub fn count(&self) -> usize {
self.pool
.get_all_nodes()
.into_iter()
.filter(|node| {
self.conditions.iter().all(|condition| condition(node))
})
.count()
}
pub fn parallel_find_all(&self) -> Vec<Arc<Node>> {
let conditions = &self.conditions;
self.pool.parallel_query(|node| {
conditions.iter().all(|condition| condition(node))
})
}
pub fn parallel_find_first(&self) -> Option<Arc<Node>> {
let conditions = &self.conditions;
self.pool.get_all_nodes().into_par_iter().find_any(move |node| {
conditions.iter().all(|condition| condition(node))
})
}
pub fn parallel_count(&self) -> usize {
let conditions = &self.conditions;
self.pool
.get_all_nodes()
.into_par_iter()
.filter(move |node| {
conditions.iter().all(|condition| condition(node))
})
.count()
}
}
impl NodePool {
pub fn query(&self) -> QueryEngine<'_> {
QueryEngine::new(self)
}
}
#[derive(Clone, Debug)]
pub struct QueryCacheConfig {
pub capacity: usize,
pub enabled: bool,
}
impl Default for QueryCacheConfig {
fn default() -> Self {
Self { capacity: 1000, enabled: true }
}
}
pub struct OptimizedQueryEngine {
pool: Arc<NodePool>,
cache: Option<LruCache<String, Vec<Arc<Node>>>>,
type_index: HashMap<String, Vec<Arc<Node>>>,
depth_index: HashMap<usize, Vec<Arc<Node>>>,
mark_index: HashMap<String, Vec<Arc<Node>>>,
}
impl OptimizedQueryEngine {
pub fn new(
pool: &NodePool,
config: QueryCacheConfig,
) -> PoolResult<Self> {
let mut engine = Self {
pool: Arc::new(pool.clone()),
cache: if config.enabled {
Some(LruCache::new(
NonZeroUsize::new(config.capacity).ok_or_else(|| {
anyhow::anyhow!("query cache capacity must be > 0")
})?,
))
} else {
None
},
type_index: HashMap::new(),
depth_index: HashMap::new(),
mark_index: HashMap::new(),
};
let start = Instant::now();
engine.build_indices()?;
let duration = start.elapsed();
println!("索引构建完成,耗时: {duration:?}");
Ok(engine)
}
fn build_indices(&mut self) -> PoolResult<()> {
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::Arc;
let node_count = self.pool.size();
let type_index =
Arc::new(Mutex::new(HashMap::with_capacity(node_count / 5)));
let depth_index = Arc::new(Mutex::new(HashMap::with_capacity(10)));
let mark_index =
Arc::new(Mutex::new(HashMap::with_capacity(node_count / 10)));
let optimal_shard_size = 1000;
let mut all_nodes: Vec<_> = self
.pool
.inner
.nodes
.iter()
.flat_map(|shard| shard.values().cloned())
.collect();
all_nodes.sort_by(|a, b| a.id.cmp(&b.id));
let shards: Vec<_> = all_nodes.chunks(optimal_shard_size).collect();
shards.into_par_iter().for_each(|shard| {
let mut local_type_index = HashMap::with_capacity(shard.len() / 5);
let mut local_depth_index = HashMap::with_capacity(5);
let mut local_mark_index = HashMap::with_capacity(shard.len() / 10);
let mut type_nodes = Vec::with_capacity(shard.len());
let mut depth_nodes = Vec::with_capacity(shard.len());
let mut mark_nodes = Vec::with_capacity(shard.len() * 2);
for node in shard {
type_nodes.push((node.r#type.clone(), Arc::clone(node)));
if let Some(depth) = self.pool.get_node_depth(&node.id) {
depth_nodes.push((depth, Arc::clone(node)));
}
for mark in &node.marks {
mark_nodes.push((mark.r#type.clone(), Arc::clone(node)));
}
}
for (type_name, node) in type_nodes {
local_type_index
.entry(type_name)
.or_insert_with(|| Vec::with_capacity(shard.len() / 5))
.push(node);
}
for (depth, node) in depth_nodes {
local_depth_index
.entry(depth)
.or_insert_with(|| Vec::with_capacity(shard.len() / 10))
.push(node);
}
for (mark_type, node) in mark_nodes {
local_mark_index
.entry(mark_type)
.or_insert_with(|| Vec::with_capacity(shard.len() / 10))
.push(node);
}
{
if let Ok(mut type_idx) = type_index.lock() {
for (k, v) in local_type_index {
type_idx
.entry(k)
.or_insert_with(|| Vec::with_capacity(v.len()))
.extend(v);
}
} else {
return;
}
}
{
if let Ok(mut depth_idx) = depth_index.lock() {
for (k, v) in local_depth_index {
depth_idx
.entry(k)
.or_insert_with(|| Vec::with_capacity(v.len()))
.extend(v);
}
} else {
return;
}
}
{
if let Ok(mut mark_idx) = mark_index.lock() {
for (k, v) in local_mark_index {
mark_idx
.entry(k)
.or_insert_with(|| Vec::with_capacity(v.len()))
.extend(v);
}
}
}
});
self.type_index = Arc::try_unwrap(type_index)
.map_err(|_| anyhow::anyhow!("type_index still has refs"))?
.into_inner()
.map_err(|_| anyhow::anyhow!("type_index poisoned"))?;
self.depth_index = Arc::try_unwrap(depth_index)
.map_err(|_| anyhow::anyhow!("depth_index still has refs"))?
.into_inner()
.map_err(|_| anyhow::anyhow!("depth_index poisoned"))?;
self.mark_index = Arc::try_unwrap(mark_index)
.map_err(|_| anyhow::anyhow!("mark_index still has refs"))?
.into_inner()
.map_err(|_| anyhow::anyhow!("mark_index poisoned"))?;
Ok(())
}
pub fn by_type(
&self,
node_type: &str,
) -> Vec<Arc<Node>> {
self.type_index.get(node_type).cloned().unwrap_or_default()
}
pub fn by_depth(
&self,
depth: usize,
) -> Vec<Arc<Node>> {
self.depth_index.get(&depth).cloned().unwrap_or_default()
}
pub fn by_mark(
&self,
mark_type: &str,
) -> Vec<Arc<Node>> {
self.mark_index.get(mark_type).cloned().unwrap_or_default()
}
pub fn query(
&mut self,
conditions: Vec<NodeCondition>,
) -> Vec<Arc<Node>> {
let cache_key = self.generate_query_cache_key(&conditions);
if let Some(cache) = &self.cache {
if let Some(cached) = cache.peek(&cache_key) {
return cached.clone();
}
}
let mut candidates: Option<Vec<Arc<Node>>> = None;
for condition in &conditions {
if let Some(indexed) = self.get_indexed_nodes(condition) {
candidates = match candidates {
None => Some(indexed),
Some(existing) => {
Some(self.intersect_nodes(&existing, &indexed))
},
};
}
}
let result: Vec<Arc<Node>> = match candidates {
Some(nodes) => {
nodes
.par_iter()
.filter(|node| {
conditions.iter().all(|condition| condition(node))
})
.cloned()
.collect()
},
None => {
self.pool
.parallel_query(|node| {
conditions.iter().all(|condition| condition(node))
})
.into_iter()
.collect()
},
};
if let Some(cache) = &mut self.cache {
cache.put(cache_key, result.clone());
}
result
}
fn generate_query_cache_key(
&self,
conditions: &[NodeCondition],
) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
conditions.len().hash(&mut hasher);
self.pool.key().hash(&mut hasher);
for (i, _condition) in conditions.iter().enumerate() {
i.hash(&mut hasher);
std::ptr::addr_of!(_condition).hash(&mut hasher);
}
format!("query_{:x}", hasher.finish())
}
fn get_indexed_nodes(
&self,
condition: &(dyn Fn(&Node) -> bool + Send + Sync),
) -> Option<Vec<Arc<Node>>> {
if let Some(type_nodes) = self.type_index.get("document") {
if condition(&type_nodes[0]) {
return Some(type_nodes.clone());
}
}
if let Some(depth_nodes) = self.depth_index.get(&0) {
if condition(&depth_nodes[0]) {
return Some(depth_nodes.clone());
}
}
for mark_nodes in self.mark_index.values() {
if !mark_nodes.is_empty() && condition(&mark_nodes[0]) {
return Some(mark_nodes.clone());
}
}
None
}
fn intersect_nodes(
&self,
nodes1: &[Arc<Node>],
nodes2: &[Arc<Node>],
) -> Vec<Arc<Node>> {
let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
nodes2
.iter()
.filter(|node| set1.contains(node.id.as_ref()))
.cloned()
.collect()
}
}
impl NodePool {
pub fn optimized_query(
&self,
config: QueryCacheConfig,
) -> PoolResult<OptimizedQueryEngine> {
OptimizedQueryEngine::new(self, config)
}
}
impl Clone for OptimizedQueryEngine {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
cache: self.cache.clone(),
type_index: self.type_index.clone(),
depth_index: self.depth_index.clone(),
mark_index: self.mark_index.clone(),
}
}
}
#[derive(Clone, Debug)]
pub struct LazyQueryConfig {
pub cache_capacity: usize,
pub index_cache_capacity: usize,
pub cache_enabled: bool,
pub index_build_threshold: usize,
}
impl Default for LazyQueryConfig {
fn default() -> Self {
Self {
cache_capacity: 1000,
index_cache_capacity: 100,
cache_enabled: true,
index_build_threshold: 5,
}
}
}
#[derive(Debug, Clone)]
pub struct QueryStats {
count: usize,
last_query: Instant,
}
pub struct LazyQueryEngine {
pool: Arc<NodePool>,
query_cache: Option<LruCache<String, Vec<Arc<Node>>>>,
type_index_cache: LruCache<String, Vec<Arc<Node>>>,
depth_index_cache: LruCache<usize, Vec<Arc<Node>>>,
mark_index_cache: LruCache<String, Vec<Arc<Node>>>,
type_query_stats: HashMap<String, QueryStats>,
depth_query_stats: HashMap<usize, QueryStats>,
mark_query_stats: HashMap<String, QueryStats>,
config: LazyQueryConfig,
}
impl LazyQueryEngine {
pub fn new(
pool: &NodePool,
config: LazyQueryConfig,
) -> Self {
Self {
pool: Arc::new(pool.clone()),
query_cache: if config.cache_enabled {
Some(LruCache::new(
NonZeroUsize::new(config.cache_capacity)
.expect("cache_capacity > 0"),
))
} else {
None
},
type_index_cache: LruCache::new(
NonZeroUsize::new(config.index_cache_capacity)
.expect("index_cache_capacity > 0"),
),
depth_index_cache: LruCache::new(
NonZeroUsize::new(config.index_cache_capacity)
.expect("index_cache_capacity > 0"),
),
mark_index_cache: LruCache::new(
NonZeroUsize::new(config.index_cache_capacity)
.expect("index_cache_capacity > 0"),
),
type_query_stats: HashMap::new(),
depth_query_stats: HashMap::new(),
mark_query_stats: HashMap::new(),
config,
}
}
pub fn by_type_lazy(
&mut self,
node_type: &str,
) -> Vec<Arc<Node>> {
self.update_type_stats(node_type);
if let Some(cached) = self.type_index_cache.get(node_type) {
return cached.clone();
}
let start = Instant::now();
let nodes = self.build_type_index(node_type);
let duration = start.elapsed();
println!(
"实时构建类型索引 '{}', 耗时: {:?}, 节点数: {}",
node_type,
duration,
nodes.len()
);
self.type_index_cache.put(node_type.to_string(), nodes.clone());
nodes
}
pub fn by_depth_lazy(
&mut self,
depth: usize,
) -> Vec<Arc<Node>> {
self.update_depth_stats(depth);
if let Some(cached) = self.depth_index_cache.get(&depth) {
return cached.clone();
}
let start = Instant::now();
let nodes = self.build_depth_index(depth);
let duration = start.elapsed();
println!(
"实时构建深度索引 {}, 耗时: {:?}, 节点数: {}",
depth,
duration,
nodes.len()
);
self.depth_index_cache.put(depth, nodes.clone());
nodes
}
pub fn by_mark_lazy(
&mut self,
mark_type: &str,
) -> Vec<Arc<Node>> {
self.update_mark_stats(mark_type);
if let Some(cached) = self.mark_index_cache.get(mark_type) {
return cached.clone();
}
let start = Instant::now();
let nodes = self.build_mark_index(mark_type);
let duration = start.elapsed();
println!(
"实时构建标记索引 '{}', 耗时: {:?}, 节点数: {}",
mark_type,
duration,
nodes.len()
);
self.mark_index_cache.put(mark_type.to_string(), nodes.clone());
nodes
}
pub fn smart_query<F>(
&mut self,
query_name: &str,
query_fn: F,
) -> Vec<Arc<Node>>
where
F: Fn() -> Vec<Arc<Node>>,
{
let cache_key = self.generate_cache_key(query_name);
if let Some(cache) = &self.query_cache {
if let Some(cached) = cache.peek(&cache_key) {
return cached.clone();
}
}
let start = Instant::now();
let result = query_fn();
let duration = start.elapsed();
println!(
"执行查询 '{}', 耗时: {:?}, 结果数: {}",
query_name,
duration,
result.len()
);
if let Some(cache) = &mut self.query_cache {
cache.put(cache_key, result.clone());
}
result
}
pub fn combined_query(
&mut self,
conditions: &[QueryCondition],
) -> Vec<Arc<Node>> {
let cache_key = self.generate_combined_cache_key(conditions);
if let Some(cache) = &self.query_cache {
if let Some(cached) = cache.peek(&cache_key) {
return cached.clone();
}
}
let mut candidates: Option<Vec<Arc<Node>>> = None;
for condition in conditions {
let indexed_nodes = match condition {
QueryCondition::ByType(type_name) => {
if self.should_use_type_index(type_name) {
Some(self.by_type_lazy(type_name))
} else {
None
}
},
QueryCondition::ByDepth(depth) => {
if self.should_use_depth_index(*depth) {
Some(self.by_depth_lazy(*depth))
} else {
None
}
},
QueryCondition::ByMark(mark_type) => {
if self.should_use_mark_index(mark_type) {
Some(self.by_mark_lazy(mark_type))
} else {
None
}
},
QueryCondition::ByAttr { .. }
| QueryCondition::IsLeaf
| QueryCondition::HasChildren => None,
};
if let Some(indexed) = indexed_nodes {
candidates = match candidates {
None => Some(indexed),
Some(existing) => {
Some(self.intersect_nodes(&existing, &indexed))
},
};
}
}
let result = match candidates {
Some(nodes) => nodes
.into_par_iter()
.filter(|node| conditions.iter().all(|cond| cond.matches(node)))
.collect(),
None => {
self.pool.parallel_query(|node| {
conditions.iter().all(|cond| cond.matches(node))
})
},
};
if let Some(cache) = &mut self.query_cache {
cache.put(cache_key, result.clone());
}
result
}
fn update_type_stats(
&mut self,
type_name: &str,
) {
let stats = self
.type_query_stats
.entry(type_name.to_string())
.or_insert(QueryStats { count: 0, last_query: Instant::now() });
stats.count += 1;
stats.last_query = Instant::now();
}
fn update_depth_stats(
&mut self,
depth: usize,
) {
let stats = self
.depth_query_stats
.entry(depth)
.or_insert(QueryStats { count: 0, last_query: Instant::now() });
stats.count += 1;
stats.last_query = Instant::now();
}
fn update_mark_stats(
&mut self,
mark_type: &str,
) {
let stats = self
.mark_query_stats
.entry(mark_type.to_string())
.or_insert(QueryStats { count: 0, last_query: Instant::now() });
stats.count += 1;
stats.last_query = Instant::now();
}
fn should_use_type_index(
&self,
type_name: &str,
) -> bool {
self.type_query_stats
.get(type_name)
.map(|stats| stats.count >= self.config.index_build_threshold)
.unwrap_or(false)
}
fn should_use_depth_index(
&self,
depth: usize,
) -> bool {
self.depth_query_stats
.get(&depth)
.map(|stats| stats.count >= self.config.index_build_threshold)
.unwrap_or(false)
}
fn should_use_mark_index(
&self,
mark_type: &str,
) -> bool {
self.mark_query_stats
.get(mark_type)
.map(|stats| stats.count >= self.config.index_build_threshold)
.unwrap_or(false)
}
fn build_type_index(
&self,
node_type: &str,
) -> Vec<Arc<Node>> {
self.pool.parallel_query(|node| node.r#type == node_type)
}
fn build_depth_index(
&self,
target_depth: usize,
) -> Vec<Arc<Node>> {
self.pool.parallel_query(|node| {
self.pool
.get_node_depth(&node.id)
.map(|depth| depth == target_depth)
.unwrap_or(false)
})
}
fn build_mark_index(
&self,
mark_type: &str,
) -> Vec<Arc<Node>> {
self.pool.parallel_query(|node| {
node.marks.iter().any(|mark| mark.r#type == mark_type)
})
}
fn generate_cache_key(
&self,
query_name: &str,
) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query_name.hash(&mut hasher);
self.pool.key().hash(&mut hasher);
format!("query_{:x}", hasher.finish())
}
fn generate_combined_cache_key(
&self,
conditions: &[QueryCondition],
) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
for condition in conditions {
condition.cache_key().hash(&mut hasher);
}
self.pool.key().hash(&mut hasher);
format!("combined_{:x}", hasher.finish())
}
fn intersect_nodes(
&self,
nodes1: &[Arc<Node>],
nodes2: &[Arc<Node>],
) -> Vec<Arc<Node>> {
let set1: HashSet<_> = nodes1.iter().map(|n| n.id.as_ref()).collect();
nodes2
.iter()
.filter(|node| set1.contains(node.id.as_ref()))
.cloned()
.collect()
}
pub fn get_query_stats(&self) -> QueryStatsSummary {
QueryStatsSummary {
type_queries: self.type_query_stats.clone(),
depth_queries: self.depth_query_stats.clone(),
mark_queries: self.mark_query_stats.clone(),
cache_hit_rates: self.calculate_cache_hit_rates(),
}
}
fn calculate_cache_hit_rates(&self) -> CacheHitRates {
CacheHitRates {
query_cache_size: self
.query_cache
.as_ref()
.map(|c| c.len())
.unwrap_or(0),
type_index_cache_size: self.type_index_cache.len(),
depth_index_cache_size: self.depth_index_cache.len(),
mark_index_cache_size: self.mark_index_cache.len(),
}
}
}
#[derive(Debug, Clone)]
pub enum QueryCondition {
ByType(String),
ByDepth(usize),
ByMark(String),
ByAttr { key: String, value: serde_json::Value },
IsLeaf,
HasChildren,
}
impl QueryCondition {
pub fn matches(
&self,
node: &Node,
) -> bool {
match self {
QueryCondition::ByType(type_name) => node.r#type == *type_name,
QueryCondition::ByDepth(_) => true, QueryCondition::ByMark(mark_type) => {
node.marks.iter().any(|mark| mark.r#type == *mark_type)
},
QueryCondition::ByAttr { key, value } => {
node.attrs.get(key) == Some(value)
},
QueryCondition::IsLeaf => node.content.is_empty(),
QueryCondition::HasChildren => !node.content.is_empty(),
}
}
pub fn cache_key(&self) -> String {
match self {
QueryCondition::ByType(t) => format!("type_{t}"),
QueryCondition::ByDepth(d) => format!("depth_{d}"),
QueryCondition::ByMark(m) => format!("mark_{m}"),
QueryCondition::ByAttr { key, value } => {
format!(
"attr_{}_{}",
key,
serde_json::to_string(value).unwrap_or_default()
)
},
QueryCondition::IsLeaf => "is_leaf".to_string(),
QueryCondition::HasChildren => "has_children".to_string(),
}
}
}
#[derive(Debug)]
pub struct QueryStatsSummary {
pub type_queries: HashMap<String, QueryStats>,
pub depth_queries: HashMap<usize, QueryStats>,
pub mark_queries: HashMap<String, QueryStats>,
pub cache_hit_rates: CacheHitRates,
}
#[derive(Debug)]
pub struct CacheHitRates {
pub query_cache_size: usize,
pub type_index_cache_size: usize,
pub depth_index_cache_size: usize,
pub mark_index_cache_size: usize,
}
impl NodePool {
pub fn lazy_query(
&self,
config: LazyQueryConfig,
) -> LazyQueryEngine {
LazyQueryEngine::new(self, config)
}
}