use crate::api::SoqlQueryBuilder;
use crate::auth::Authenticator;
use crate::client::ForceClient;
use crate::error::{ForceError, Result};
use crate::types::DynamicSObject;
use serde_json::Value;
use super::query_batch::{BatchOp, BatchStats, QueryBatch};
#[derive(Debug)]
pub struct SoqlMassOp<'a, A: Authenticator> {
client: &'a ForceClient<A>,
query: SoqlQueryBuilder,
halt_on_error: bool,
}
impl<'a, A: Authenticator> SoqlMassOp<'a, A> {
#[must_use]
pub fn new(client: &'a ForceClient<A>, query: SoqlQueryBuilder) -> Self {
Self {
client,
query,
halt_on_error: false,
}
}
#[must_use]
pub fn halt_on_error(mut self, halt: bool) -> Self {
self.halt_on_error = halt;
self
}
pub async fn delete_all(self) -> Result<BatchStats> {
let qstr = self.query.try_build()?;
let batch_op = QueryBatch::new(self.client, qstr).halt_on_error(self.halt_on_error);
batch_op
.run::<DynamicSObject, _>(|record| {
let id = record
.get_field_as::<String>("Id")
.ok()
.flatten()
.unwrap_or_default();
if id.is_empty() {
None
} else {
Some(BatchOp::Delete(record.object_type().to_string(), id))
}
})
.await
}
pub async fn update_all(self, updates: Value) -> Result<BatchStats> {
if !updates.is_object() {
return Err(ForceError::InvalidInput(
"Updates must be a JSON object".to_string(),
));
}
let qstr = self.query.try_build()?;
let batch_op = QueryBatch::new(self.client, qstr).halt_on_error(self.halt_on_error);
batch_op
.run::<DynamicSObject, _>(|record| {
let id = record
.get_field_as::<String>("Id")
.ok()
.flatten()
.unwrap_or_default();
if id.is_empty() {
None
} else {
Some(BatchOp::Update(
record.object_type().to_string(),
id,
updates.clone(),
))
}
})
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client::builder;
use crate::test_support::{MockAuthenticator, Must};
use serde_json::json;
use wiremock::matchers::{method, path, query_param};
use wiremock::{Mock, MockServer, ResponseTemplate};
async fn create_mock_server() -> MockServer {
MockServer::start().await
}
async fn create_test_client(mock_server: &MockServer) -> ForceClient<MockAuthenticator> {
let auth = MockAuthenticator::new("test_token", &mock_server.uri());
builder().authenticate(auth).build().await.must()
}
#[tokio::test]
async fn test_mass_delete_success() {
let mock_server = create_mock_server().await;
let client = create_test_client(&mock_server).await;
Mock::given(method("GET"))
.and(path("/services/data/v60.0/query"))
.and(query_param("q", "SELECT Id FROM Account WHERE Name LIKE 'Test %'"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"totalSize": 2,
"done": true,
"records": [
{ "attributes": { "type": "Account", "url": "/services/data/v60.0/sobjects/Account/001000000000001AAA" }, "Id": "001000000000001AAA" },
{ "attributes": { "type": "Account", "url": "/services/data/v60.0/sobjects/Account/001000000000002AAA" }, "Id": "001000000000002AAA" }
]
})))
.mount(&mock_server)
.await;
Mock::given(method("POST"))
.and(path("/services/data/v60.0/composite/batch"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"hasErrors": false,
"results": [
{ "statusCode": 204, "result": null },
{ "statusCode": 204, "result": null }
]
})))
.mount(&mock_server)
.await;
let query = SoqlQueryBuilder::new()
.select(&["Id"])
.from("Account")
.where_like("Name", "Test %");
let op = SoqlMassOp::new(&client, query);
let stats = op.delete_all().await.must();
assert_eq!(stats.records_processed, 2);
assert_eq!(stats.ops_succeeded, 2);
assert_eq!(stats.ops_failed, 0);
}
#[tokio::test]
async fn test_mass_update_success() {
let mock_server = create_mock_server().await;
let client = create_test_client(&mock_server).await;
Mock::given(method("GET"))
.and(path("/services/data/v60.0/query"))
.and(query_param("q", "SELECT Id FROM Contact WHERE Status = 'New'"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"totalSize": 1,
"done": true,
"records": [
{ "attributes": { "type": "Contact", "url": "/services/data/v60.0/sobjects/Contact/003000000000001AAA" }, "Id": "003000000000001AAA" }
]
})))
.mount(&mock_server)
.await;
Mock::given(method("POST"))
.and(path("/services/data/v60.0/composite/batch"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"hasErrors": false,
"results": [
{ "statusCode": 204, "result": null }
]
})))
.mount(&mock_server)
.await;
let query = SoqlQueryBuilder::new()
.select(&["Id"])
.from("Contact")
.where_eq("Status", "New");
let op = SoqlMassOp::new(&client, query);
let updates = json!({
"Status": "Processed"
});
let stats = op.update_all(updates).await.must();
assert_eq!(stats.records_processed, 1);
assert_eq!(stats.ops_succeeded, 1);
assert_eq!(stats.ops_failed, 0);
}
#[tokio::test]
async fn test_delete_all_missing_id_is_skipped() {
let mock_server = create_mock_server().await;
let client = create_test_client(&mock_server).await;
Mock::given(method("GET"))
.and(path("/services/data/v60.0/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"totalSize": 1,
"done": true,
"records": [
{ "attributes": { "type": "Account", "url": "/services/data/v60.0/sobjects/Account/001000000000001AAA" }, "Name": "Test" }
]
})))
.mount(&mock_server)
.await;
let query = SoqlQueryBuilder::new().select(&["Name"]).from("Account");
let op = SoqlMassOp::new(&client, query);
let stats = op.delete_all().await.must();
assert_eq!(stats.ops_succeeded, 0);
assert_eq!(stats.ops_failed, 0);
}
#[tokio::test]
async fn test_update_all_missing_id_is_skipped() {
let mock_server = create_mock_server().await;
let client = create_test_client(&mock_server).await;
Mock::given(method("GET"))
.and(path("/services/data/v60.0/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"totalSize": 1,
"done": true,
"records": [
{ "attributes": { "type": "Account", "url": "/services/data/v60.0/sobjects/Account/001000000000001AAA" }, "Name": "Test" }
]
})))
.mount(&mock_server)
.await;
let query = SoqlQueryBuilder::new().select(&["Name"]).from("Account");
let op = SoqlMassOp::new(&client, query);
let stats = op.update_all(json!({"Name": "Updated"})).await.must();
assert_eq!(stats.ops_succeeded, 0);
assert_eq!(stats.ops_failed, 0);
}
}