use crate::error::{ExecutionError, MongoshError, Result};
use bson::{doc, Bson, Document};
use futures::future::BoxFuture;
use mongodb::Client;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct OperationHandle {
pub comment: String,
}
impl OperationHandle {
pub fn new(client_id: &str) -> Self {
Self {
comment: format!("mongosh-{}-{}", client_id, Uuid::new_v4()),
}
}
pub fn comment(&self) -> &str {
&self.comment
}
}
pub struct MongoOpKiller {
client: Client,
}
impl MongoOpKiller {
pub fn new(client: Client) -> Self {
Self { client }
}
pub async fn kill_by_comment(&self, handle: &OperationHandle) -> Result<()> {
let admin_db = self.client.database("admin");
let pipeline = vec![
doc! {
"$currentOp": {
"allUsers": true,
"localOps": true
}
},
doc! {
"$match": {
"command.comment": &handle.comment
}
},
];
let mut cursor = match admin_db.aggregate(pipeline).await {
Ok(cursor) => cursor,
Err(_e) => {
return Ok(());
}
};
use futures::stream::StreamExt;
if let Some(result) = cursor.next().await {
let op_doc = match result {
Ok(doc) => doc,
Err(_e) => return Ok(()),
};
let opid = match extract_opid(&op_doc) {
Ok(id) => id,
Err(_e) => return Ok(()),
};
let kill_result = admin_db
.run_command(doc! { "killOp": 1, "op": opid })
.await;
let _ = kill_result;
}
Ok(())
}
}
fn extract_opid(doc: &Document) -> Result<i64> {
if let Some(opid_bson) = doc.get("opid") {
match opid_bson {
Bson::Int32(v) => Ok(*v as i64),
Bson::Int64(v) => Ok(*v),
_ => Err(MongoshError::Generic(format!(
"Unexpected type for opid field: {:?}",
opid_bson
))),
}
} else {
Err(MongoshError::Generic(
"No opid field found in $currentOp result".to_string(),
))
}
}
pub async fn run_killable_command<F, T>(
client: Client,
client_id: &str,
cancel_token: CancellationToken,
exec_fn: F,
) -> Result<T>
where
F: FnOnce(Client, OperationHandle) -> BoxFuture<'static, Result<T>>,
{
let handle = OperationHandle::new(client_id);
let killer = MongoOpKiller::new(client.clone());
let command_fut = exec_fn(client, handle.clone());
tokio::select! {
result = command_fut => {
result
}
_ = cancel_token.cancelled() => {
let _ = killer.kill_by_comment(&handle).await;
Err(MongoshError::Execution(ExecutionError::Cancelled(
"Operation cancelled by user (Ctrl+C)".to_string()
)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_operation_handle_format() {
let handle = OperationHandle::new("test-client");
assert!(handle.comment.starts_with("mongosh-test-client-"));
assert!(handle.comment.len() > "mongosh-test-client-".len());
}
#[test]
fn test_operation_handle_uniqueness() {
let handle1 = OperationHandle::new("test");
let handle2 = OperationHandle::new("test");
assert_ne!(handle1.comment, handle2.comment);
}
#[test]
fn test_extract_opid_i32() {
let doc = doc! { "opid": 12345i32 };
let opid = extract_opid(&doc).unwrap();
assert_eq!(opid, 12345i64);
}
#[test]
fn test_extract_opid_i64() {
let doc = doc! { "opid": 9876543210i64 };
let opid = extract_opid(&doc).unwrap();
assert_eq!(opid, 9876543210i64);
}
#[test]
fn test_extract_opid_missing() {
let doc = doc! { "other": "field" };
let result = extract_opid(&doc);
assert!(result.is_err());
}
#[test]
fn test_extract_opid_wrong_type() {
let doc = doc! { "opid": "string" };
let result = extract_opid(&doc);
assert!(result.is_err());
}
}