use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use bson::Bson;
use crate::vm::vm_external_func::{VmExternalFunc, VmExternalFuncStatus};
use crate::{Result, Error};
use indexmap::IndexMap;
use crate::vm::operators::{OpRegistry, VmOperator};
const NAME: &str = "group";
pub(crate) struct VmFuncGroup {
is_completed: AtomicBool,
inner: Mutex<VmFuncGroupInner>,
}
struct VmFuncGroupInner {
group_values: IndexMap<String, Bson>,
operators: HashMap<String, Box<dyn VmOperator>>,
}
impl VmFuncGroup {
fn compile_command(
paths: &mut Vec<String>,
registry: OpRegistry,
key: &str,
value: &Bson,
group_values: &mut IndexMap<String, Bson>,
operators: &mut HashMap<String, Box<dyn VmOperator>>,
) -> Result<()> {
let op = registry.compile(paths, value)?;
group_values.insert(key.into(), op.initial_value());
operators.insert(key.into(), op);
Ok(())
}
pub(crate) fn compile(
paths: &mut Vec<String>,
registry: OpRegistry,
value: &Bson,
) -> Result<Box<dyn VmExternalFunc>> {
let doc = crate::try_unwrap_document!("$group", value);
let mut group_values = IndexMap::new();
let mut operators = HashMap::new();
let mut found_id = false;
for (k, v) in doc.iter() {
crate::path_hint_2!(paths, k.clone(), {
group_values.insert(k.clone(), v.clone());
let k_str = k.as_str();
if k_str == "_id" {
found_id = true;
paths.pop();
continue;
}
VmFuncGroup::compile_command(
paths,
registry.clone(),
k_str,
v,
&mut group_values,
&mut operators,
)?;
});
}
if !found_id {
let err_msg = "Field '_id' is required for $group".to_string();
return Err(Error::ValidationError(err_msg));
}
let result = VmFuncGroup {
is_completed: AtomicBool::new(false),
inner: Mutex::new(VmFuncGroupInner {
group_values,
operators,
}),
};
Ok(Box::new(result))
}
}
impl VmExternalFunc for VmFuncGroup {
fn name(&self) -> &str {
NAME
}
fn call(&self, args: &[Bson]) -> Result<VmExternalFuncStatus> {
let arg0 = &args[0];
let mut inner = self.inner.lock().unwrap();
if arg0.as_null().is_some() { self.is_completed.store(true, Ordering::Relaxed);
let mut result = bson::Document::new();
for (k, value) in inner.group_values.iter() {
if k == "_id" {
result.insert(k.clone(), value.clone());
continue;
}
let op = inner.operators.get(k).expect("Operator not found");
let final_value = op.complete();
result.insert(k.clone(), final_value);
}
return Ok(VmExternalFuncStatus::Next(result.into()));
}
let next_map = inner.group_values
.iter()
.map(|(k, v)| {
if k == "_id" {
return (k.clone(), v.clone());
}
let op = inner.operators.get(k).expect("Operator not found");
let next = op.next(v);
(k.clone(), next)
})
.collect::<IndexMap<String, Bson>>();
inner.group_values = next_map;
Ok(VmExternalFuncStatus::Continue)
}
fn is_completed(&self) -> bool {
true
}
}