use crate::error::{MongoshError, Result};
use crate::parser::UtilityCommand;
use tracing::info;
use super::context::ExecutionContext;
use super::result::{ExecutionResult, ExecutionStats, ResultData};
pub struct UtilityExecutor {
context: ExecutionContext,
}
impl UtilityExecutor {
pub fn new(context: ExecutionContext) -> Self {
Self { context }
}
pub async fn execute(&self, cmd: UtilityCommand) -> Result<ExecutionResult> {
match cmd {
UtilityCommand::Print(text) => Ok(ExecutionResult {
success: true,
data: ResultData::Message(text),
stats: ExecutionStats::default(),
error: None,
}),
UtilityCommand::Iterate => self.execute_iterate().await,
}
}
async fn execute_iterate(&self) -> Result<ExecutionResult> {
use futures::stream::TryStreamExt;
let mut cursor_guard = self.context.shared_state.get_cursor_mut().await;
let cursor_state = cursor_guard.as_mut().ok_or_else(|| {
MongoshError::Generic("No active cursor. Please run a query first.".to_string())
})?;
if cursor_state.is_expired() {
*cursor_guard = None;
drop(cursor_guard);
self.context.shared_state.clear_cursor().await;
return Err(MongoshError::Generic(
"Cursor has expired (10 minute timeout). Please re-run your query.".to_string(),
)
.into());
}
let batch_size = cursor_state.batch_size;
let mut documents = Vec::new();
let mut count = 0;
while count < batch_size as usize {
match cursor_state.cursor.try_next().await {
Ok(Some(doc)) => {
documents.push(doc);
count += 1;
}
Ok(None) => {
break;
}
Err(e) => {
*cursor_guard = None;
drop(cursor_guard);
self.context.shared_state.clear_cursor().await;
return Err(
crate::error::ExecutionError::CursorError(e.to_string()).into()
);
}
}
}
info!("Retrieved {} documents from cursor", count);
cursor_state.update_retrieved(count);
let has_more = count == batch_size as usize;
if !has_more {
let total_retrieved = cursor_state.documents_retrieved();
*cursor_guard = None;
drop(cursor_guard);
self.context.shared_state.clear_cursor().await;
info!("Cursor exhausted. Total {} documents retrieved.", total_retrieved);
}
let result_data = if has_more {
ResultData::DocumentsWithPagination {
documents,
has_more: true,
displayed: count,
}
} else {
ResultData::Documents(documents)
};
Ok(ExecutionResult {
success: true,
data: result_data,
stats: ExecutionStats {
execution_time_ms: 0,
documents_returned: count,
documents_affected: None,
},
error: None,
})
}
}
impl Default for UtilityExecutor {
fn default() -> Self {
Self::new(ExecutionContext::new(
crate::connection::ConnectionManager::new(
"mongodb://localhost:27017".to_string(),
crate::config::ConnectionConfig::default(),
),
crate::repl::SharedState::new("test".to_string()),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_utility_executor_print() {
let executor = UtilityExecutor::default();
let result = executor
.execute(UtilityCommand::Print("Hello".to_string()))
.await
.unwrap();
assert!(result.success);
match result.data {
ResultData::Message(msg) => assert_eq!(msg, "Hello"),
_ => panic!("Expected Message result"),
}
}
#[test]
fn test_utility_executor_default() {
let _executor = UtilityExecutor::default();
}
}