use crate::error::{CirrusError, CirrusResult, SalesforceError};
use reqwest::header::HeaderMap;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
#[derive(Debug, Clone, Deserialize)]
pub struct QueryResult<R> {
#[serde(rename = "totalSize")]
pub total_size: i64,
pub done: bool,
#[serde(rename = "nextRecordsUrl", default)]
pub next_records_url: Option<String>,
#[serde(default = "Vec::new")]
pub records: Vec<R>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SearchResult<R> {
#[serde(rename = "searchRecords", default = "Vec::new")]
pub search_records: Vec<R>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SObjectCreateResult {
pub id: String,
pub success: bool,
#[serde(default)]
pub errors: Vec<SalesforceError>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created: Option<bool>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Limit {
#[serde(rename = "Max")]
pub max: i64,
#[serde(rename = "Remaining")]
pub remaining: i64,
#[serde(flatten)]
pub nested: HashMap<String, Limit>,
}
pub type OrgLimits = HashMap<String, Limit>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LimitInfo {
pub used: u32,
pub allowed: u32,
}
impl LimitInfo {
pub fn parse(header_value: &str) -> Option<Self> {
let rest = header_value.trim().strip_prefix("api-usage=")?;
let (used, allowed) = rest.split_once('/')?;
let used = used.trim().parse::<u32>().ok()?;
let allowed = allowed.trim().parse::<u32>().ok()?;
Some(Self { used, allowed })
}
pub fn remaining(&self) -> u32 {
self.allowed.saturating_sub(self.used)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct DescribeGlobal {
pub encoding: String,
#[serde(rename = "maxBatchSize")]
pub max_batch_size: i32,
pub sobjects: Vec<SObjectMetadata>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SObjectMetadata {
pub activateable: bool,
pub createable: bool,
pub custom: bool,
#[serde(rename = "customSetting")]
pub custom_setting: bool,
pub deletable: bool,
#[serde(rename = "deprecatedAndHidden")]
pub deprecated_and_hidden: bool,
#[serde(rename = "feedEnabled")]
pub feed_enabled: bool,
#[serde(rename = "keyPrefix", default)]
pub key_prefix: Option<String>,
pub label: String,
#[serde(rename = "labelPlural")]
pub label_plural: String,
pub layoutable: bool,
pub mergeable: bool,
#[serde(rename = "mruEnabled")]
pub mru_enabled: bool,
pub name: String,
pub queryable: bool,
pub replicateable: bool,
pub retrieveable: bool,
pub searchable: bool,
pub triggerable: bool,
pub undeletable: bool,
pub updateable: bool,
#[serde(default)]
pub urls: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BulkOperation {
#[serde(rename = "insert")]
Insert,
#[serde(rename = "update")]
Update,
#[serde(rename = "upsert")]
Upsert,
#[serde(rename = "delete")]
Delete,
#[serde(rename = "hardDelete")]
HardDelete,
#[serde(rename = "query")]
Query,
#[serde(rename = "queryAll")]
QueryAll,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BulkJobState {
Open,
UploadComplete,
InProgress,
JobComplete,
Aborted,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum BulkLineEnding {
#[default]
LF,
CRLF,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum BulkColumnDelimiter {
Backquote,
Caret,
#[default]
Comma,
Pipe,
Semicolon,
Tab,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BulkIngestJob {
pub id: String,
pub operation: BulkOperation,
pub object: String,
pub state: BulkJobState,
#[serde(rename = "externalIdFieldName", default)]
pub external_id_field_name: Option<String>,
#[serde(rename = "lineEnding")]
pub line_ending: BulkLineEnding,
#[serde(rename = "columnDelimiter")]
pub column_delimiter: BulkColumnDelimiter,
#[serde(rename = "contentType")]
pub content_type: String,
#[serde(rename = "contentUrl", default)]
pub content_url: Option<String>,
#[serde(rename = "apiVersion")]
pub api_version: f32,
#[serde(rename = "jobType")]
pub job_type: String,
#[serde(rename = "concurrencyMode")]
pub concurrency_mode: String,
#[serde(rename = "createdById")]
pub created_by_id: String,
#[serde(rename = "createdDate")]
pub created_date: String,
#[serde(rename = "systemModstamp")]
pub system_modstamp: String,
#[serde(rename = "assignmentRuleId", default)]
pub assignment_rule_id: Option<String>,
#[serde(rename = "numberRecordsProcessed", default)]
pub number_records_processed: Option<i64>,
#[serde(rename = "numberRecordsFailed", default)]
pub number_records_failed: Option<i64>,
#[serde(default)]
pub retries: Option<i32>,
#[serde(rename = "totalProcessingTime", default)]
pub total_processing_time: Option<i64>,
#[serde(rename = "apiActiveProcessingTime", default)]
pub api_active_processing_time: Option<i64>,
#[serde(rename = "apexProcessingTime", default)]
pub apex_processing_time: Option<i64>,
#[serde(rename = "errorMessage", default)]
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BulkQueryJob {
pub id: String,
pub operation: BulkOperation,
pub state: BulkJobState,
pub object: String,
#[serde(rename = "lineEnding")]
pub line_ending: BulkLineEnding,
#[serde(rename = "columnDelimiter")]
pub column_delimiter: BulkColumnDelimiter,
#[serde(rename = "contentType")]
pub content_type: String,
#[serde(rename = "apiVersion")]
pub api_version: f32,
#[serde(rename = "jobType", default)]
pub job_type: Option<String>,
#[serde(rename = "concurrencyMode")]
pub concurrency_mode: String,
#[serde(rename = "createdById")]
pub created_by_id: String,
#[serde(rename = "createdDate")]
pub created_date: String,
#[serde(rename = "systemModstamp")]
pub system_modstamp: String,
#[serde(rename = "numberRecordsProcessed", default)]
pub number_records_processed: Option<i64>,
#[serde(default)]
pub retries: Option<i32>,
#[serde(rename = "totalProcessingTime", default)]
pub total_processing_time: Option<i64>,
#[serde(rename = "isPkChunkingSupported", default)]
pub is_pk_chunking_supported: Option<bool>,
#[serde(rename = "errorMessage", default)]
pub error_message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct BulkQueryResults {
pub csv: bytes::Bytes,
pub locator: Option<String>,
pub number_of_records: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct EventLogFileRecord {
#[serde(rename = "Id")]
pub id: String,
#[serde(rename = "EventType")]
pub event_type: String,
#[serde(rename = "LogFile")]
pub log_file: String,
#[serde(rename = "LogDate")]
pub log_date: String,
#[serde(rename = "LogFileLength", default)]
pub log_file_length: Option<f64>,
#[serde(rename = "Interval", default)]
pub interval: Option<String>,
#[serde(rename = "Sequence", default)]
pub sequence: Option<i32>,
#[serde(rename = "CreatedDate", default)]
pub created_date: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ApiVersion {
pub label: String,
pub url: String,
pub version: String,
}
impl ApiVersion {
pub fn version_number(&self) -> Option<(u32, u32)> {
let (major, minor) = self.version.split_once('.')?;
Some((major.parse().ok()?, minor.parse().ok()?))
}
pub fn latest(versions: &[Self]) -> Option<&Self> {
versions.iter().max_by_key(|v| v.version_number())
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct BatchResponse {
#[serde(rename = "hasErrors")]
pub has_errors: bool,
#[serde(default = "Vec::new")]
pub results: Vec<BatchSubresult>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BatchSubresult {
#[serde(rename = "statusCode")]
pub status_code: u16,
#[serde(default)]
pub result: serde_json::Value,
}
impl BatchSubresult {
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status_code)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct CompositeTreeResponse {
#[serde(rename = "hasErrors")]
pub has_errors: bool,
#[serde(default = "Vec::new")]
pub results: Vec<CompositeTreeResult>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CompositeTreeResult {
#[serde(rename = "referenceId")]
pub reference_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub errors: Option<Vec<CompositeError>>,
}
impl CompositeTreeResult {
pub fn is_success(&self) -> bool {
self.id.is_some() && self.errors.is_none()
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct CompositeError {
#[serde(rename = "statusCode")]
pub status_code: String,
pub message: String,
#[serde(default)]
pub fields: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CompositeResponse {
#[serde(rename = "compositeResponse", default = "Vec::new")]
pub composite_response: Vec<CompositeSubresponse>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CompositeSubresponse {
#[serde(default)]
pub body: serde_json::Value,
#[serde(rename = "httpHeaders", default, with = "http_serde::header_map")]
pub http_headers: HeaderMap,
#[serde(rename = "httpStatusCode")]
pub http_status_code: u16,
#[serde(rename = "referenceId")]
pub reference_id: String,
}
impl CompositeSubresponse {
pub fn is_success(&self) -> bool {
(200..300).contains(&self.http_status_code)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct SObjectCollectionResult {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub success: bool,
#[serde(default)]
pub errors: Vec<CompositeError>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created: Option<bool>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExecuteAnonymousResult {
pub compiled: bool,
#[serde(rename = "compileProblem", default)]
pub compile_problem: Option<String>,
pub success: bool,
pub line: i32,
pub column: i32,
#[serde(rename = "exceptionMessage", default)]
pub exception_message: Option<String>,
#[serde(rename = "exceptionStackTrace", default)]
pub exception_stack_trace: Option<String>,
}
pub(crate) fn parse_response_bytes<R: DeserializeOwned>(
status: u16,
bytes: &[u8],
) -> CirrusResult<R> {
if (200..300).contains(&status) {
if bytes.is_empty() {
return serde_json::from_slice(b"null").map_err(CirrusError::Serialization);
}
return serde_json::from_slice(bytes).map_err(CirrusError::Serialization);
}
Err(parse_error_response(status, bytes))
}
pub(crate) fn parse_error_response(status: u16, bytes: &[u8]) -> CirrusError {
let errors = serde_json::from_slice::<Vec<SalesforceError>>(bytes).unwrap_or_default();
let raw = if errors.is_empty() {
Some(String::from_utf8_lossy(bytes).into_owned())
} else {
None
};
CirrusError::Api {
status,
errors,
raw,
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use serde_json::Value;
use serde_json::json;
#[test]
fn parses_success_into_value() {
let body = json!({"Id": "001xx", "Name": "Acme"}).to_string();
let parsed: Value = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(parsed["Id"], "001xx");
}
#[test]
fn parses_query_result() {
let body = json!({
"totalSize": 2,
"done": true,
"records": [
{"Id": "1", "Name": "A"},
{"Id": "2", "Name": "B"}
]
})
.to_string();
let qr: QueryResult<Value> = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(qr.total_size, 2);
assert!(qr.done);
assert_eq!(qr.records.len(), 2);
assert!(qr.next_records_url.is_none());
}
#[test]
fn parses_paginated_query_result() {
let body = json!({
"totalSize": 1500,
"done": false,
"nextRecordsUrl": "/services/data/v66.0/query/01g...-2000",
"records": []
})
.to_string();
let qr: QueryResult<Value> = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(!qr.done);
assert_eq!(
qr.next_records_url.as_deref(),
Some("/services/data/v66.0/query/01g...-2000")
);
}
#[test]
fn parses_error_array_into_api_error() {
let body = r#"[{"message":"No such column","errorCode":"INVALID_FIELD","fields":["Foo"]}]"#;
let err = parse_response_bytes::<Value>(400, body.as_bytes()).unwrap_err();
match err {
CirrusError::Api {
status,
errors,
raw,
} => {
assert_eq!(status, 400);
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].error_code, "INVALID_FIELD");
assert!(raw.is_none());
}
other => panic!("expected Api error, got {other:?}"),
}
}
#[test]
fn falls_back_to_raw_when_error_body_is_unparseable() {
let body = "<html>Internal Server Error</html>";
let err = parse_response_bytes::<Value>(500, body.as_bytes()).unwrap_err();
match err {
CirrusError::Api {
status,
errors,
raw,
} => {
assert_eq!(status, 500);
assert!(errors.is_empty());
assert_eq!(raw.as_deref(), Some(body));
}
other => panic!("expected Api error, got {other:?}"),
}
}
#[test]
fn empty_2xx_body_is_treated_as_null() {
let parsed: Option<Value> = parse_response_bytes(204, b"").unwrap();
assert!(parsed.is_none());
}
#[test]
fn parses_flat_limit() {
let body = r#"{"Max": 5000, "Remaining": 4937}"#;
let limit: Limit = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(limit.max, 5000);
assert_eq!(limit.remaining, 4937);
assert!(limit.nested.is_empty());
}
#[test]
fn parses_nested_limit() {
let body = r#"{
"Max": 1500,
"Remaining": 1499,
"CreateCustom": {"Max": 1000, "Remaining": 999}
}"#;
let limit: Limit = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(limit.max, 1500);
assert_eq!(limit.remaining, 1499);
assert_eq!(limit.nested.len(), 1);
let nested = limit.nested.get("CreateCustom").unwrap();
assert_eq!(nested.max, 1000);
assert_eq!(nested.remaining, 999);
assert!(nested.nested.is_empty());
}
#[test]
fn parses_org_limits_envelope() {
let body = json!({
"DailyApiRequests": {"Max": 5000, "Remaining": 4937},
"PermissionSets": {
"Max": 1500,
"Remaining": 1499,
"CreateCustom": {"Max": 1000, "Remaining": 999}
}
})
.to_string();
let limits: OrgLimits = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(limits.len(), 2);
assert_eq!(limits.get("DailyApiRequests").unwrap().remaining, 4937);
assert_eq!(
limits
.get("PermissionSets")
.unwrap()
.nested
.get("CreateCustom")
.unwrap()
.max,
1000
);
}
#[test]
fn parses_describe_global() {
let body = json!({
"encoding": "UTF-8",
"maxBatchSize": 200,
"sobjects": [{
"activateable": false,
"custom": false,
"customSetting": false,
"createable": true,
"deletable": true,
"deprecatedAndHidden": false,
"feedEnabled": true,
"keyPrefix": "001",
"label": "Account",
"labelPlural": "Accounts",
"layoutable": true,
"mergeable": true,
"mruEnabled": true,
"name": "Account",
"queryable": true,
"replicateable": true,
"retrieveable": true,
"searchable": true,
"triggerable": true,
"undeletable": true,
"updateable": true,
"urls": {
"sobject": "/services/data/v66.0/sobjects/Account",
"describe": "/services/data/v66.0/sobjects/Account/describe",
"rowTemplate": "/services/data/v66.0/sobjects/Account/{ID}"
}
}]
})
.to_string();
let dg: DescribeGlobal = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(dg.encoding, "UTF-8");
assert_eq!(dg.max_batch_size, 200);
assert_eq!(dg.sobjects.len(), 1);
assert_eq!(dg.sobjects[0].name, "Account");
assert_eq!(dg.sobjects[0].key_prefix.as_deref(), Some("001"));
assert_eq!(
dg.sobjects[0].urls.get("describe").map(String::as_str),
Some("/services/data/v66.0/sobjects/Account/describe")
);
}
#[test]
fn describe_global_handles_missing_key_prefix() {
let body = json!({
"encoding": "UTF-8",
"maxBatchSize": 200,
"sobjects": [{
"activateable": false, "custom": false, "customSetting": false,
"createable": false, "deletable": false, "deprecatedAndHidden": false,
"feedEnabled": false, "label": "Foo", "labelPlural": "Foos",
"layoutable": false, "mergeable": false, "mruEnabled": false,
"name": "FooSetting", "queryable": false, "replicateable": false,
"retrieveable": false, "searchable": false, "triggerable": false,
"undeletable": false, "updateable": false, "urls": {}
}]
})
.to_string();
let dg: DescribeGlobal = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(dg.sobjects[0].key_prefix.is_none());
}
#[test]
fn parses_search_result() {
let body = json!({
"searchRecords": [
{
"attributes": {
"type": "Account",
"url": "/services/data/v66.0/sobjects/Account/001xx"
},
"Id": "001xx"
},
{
"attributes": {
"type": "Contact",
"url": "/services/data/v66.0/sobjects/Contact/003yy"
},
"Id": "003yy"
}
]
})
.to_string();
let sr: SearchResult<Value> = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(sr.search_records.len(), 2);
assert_eq!(sr.search_records[0]["attributes"]["type"], "Account");
assert_eq!(sr.search_records[1]["Id"], "003yy");
assert!(sr.metadata.is_none());
}
#[test]
fn parses_search_result_with_metadata() {
let body = json!({
"searchRecords": [],
"metadata": {
"entityMetadata": [
{"entityName": "Account", "fieldMetadata": [
{"name": "Name", "label": "Account Name"}
]}
]
}
})
.to_string();
let sr: SearchResult<Value> = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(sr.search_records.is_empty());
let md = sr.metadata.expect("metadata present");
assert!(md["entityMetadata"].is_array());
}
#[test]
fn parses_empty_search_result() {
let body = r#"{"searchRecords": []}"#;
let sr: SearchResult<Value> = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(sr.search_records.is_empty());
}
#[test]
fn parses_batch_response_with_mixed_subresults() {
let body = json!({
"hasErrors": false,
"results": [
{"statusCode": 204, "result": null},
{"statusCode": 200, "result": {
"attributes": {"type": "Account"},
"Id": "001D000000K0fXOIAZ",
"Name": "NewName"
}}
]
})
.to_string();
let resp: BatchResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(!resp.has_errors);
assert_eq!(resp.results.len(), 2);
assert!(resp.results[0].is_success());
assert_eq!(resp.results[0].status_code, 204);
assert!(resp.results[0].result.is_null());
assert!(resp.results[1].is_success());
assert_eq!(resp.results[1].result["Name"], "NewName");
}
#[test]
fn parses_batch_response_with_subrequest_failure() {
let body = json!({
"hasErrors": true,
"results": [
{"statusCode": 200, "result": {"Id": "001"}},
{"statusCode": 404, "result": [
{"message": "Provided external ID field does not exist or is not accessible: bogus__c",
"errorCode": "NOT_FOUND"}
]}
]
})
.to_string();
let resp: BatchResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(resp.has_errors);
assert!(resp.results[0].is_success());
assert!(!resp.results[1].is_success());
assert_eq!(resp.results[1].status_code, 404);
assert_eq!(resp.results[1].result[0]["errorCode"], "NOT_FOUND");
}
#[test]
fn parses_batch_response_with_default_results_when_absent() {
let body = r#"{"hasErrors": false}"#;
let resp: BatchResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(resp.results.is_empty());
}
#[test]
fn parses_bulk_ingest_job_response() {
let body = json!({
"id": "750xx0000004C92AAE",
"operation": "insert",
"object": "Account",
"createdById": "005xx000001IECDAA4",
"createdDate": "2018-12-10T17:50:19.000+0000",
"systemModstamp": "2018-12-10T17:51:27.000+0000",
"state": "Open",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"contentUrl": "/services/data/v66.0/jobs/ingest/750xx0000004C92AAE/batches",
"lineEnding": "LF",
"columnDelimiter": "COMMA",
"jobType": "V2Ingest"
})
.to_string();
let job: BulkIngestJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.id, "750xx0000004C92AAE");
assert_eq!(job.operation, BulkOperation::Insert);
assert_eq!(job.state, BulkJobState::Open);
assert_eq!(job.line_ending, BulkLineEnding::LF);
assert_eq!(job.column_delimiter, BulkColumnDelimiter::Comma);
assert!(job.number_records_processed.is_none());
}
#[test]
fn parses_bulk_ingest_job_complete_with_metrics() {
let body = json!({
"id": "750xx",
"operation": "upsert",
"object": "Account",
"externalIdFieldName": "External_Id__c",
"createdById": "005xx",
"createdDate": "2024-01-01T00:00:00.000+0000",
"systemModstamp": "2024-01-01T00:00:01.000+0000",
"state": "JobComplete",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"lineEnding": "CRLF",
"columnDelimiter": "TAB",
"jobType": "V2Ingest",
"numberRecordsProcessed": 1000,
"numberRecordsFailed": 5,
"retries": 0,
"totalProcessingTime": 2349,
"apiActiveProcessingTime": 1500,
"apexProcessingTime": 0
})
.to_string();
let job: BulkIngestJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.operation, BulkOperation::Upsert);
assert_eq!(job.state, BulkJobState::JobComplete);
assert_eq!(job.line_ending, BulkLineEnding::CRLF);
assert_eq!(job.column_delimiter, BulkColumnDelimiter::Tab);
assert_eq!(
job.external_id_field_name.as_deref(),
Some("External_Id__c")
);
assert_eq!(job.number_records_processed, Some(1000));
assert_eq!(job.number_records_failed, Some(5));
assert!(job.error_message.is_none());
}
#[test]
fn parses_bulk_ingest_job_failed_with_error_message() {
let body = json!({
"id": "750xx",
"operation": "insert",
"object": "Account",
"createdById": "005xx",
"createdDate": "2024-01-01T00:00:00.000+0000",
"systemModstamp": "2024-01-01T00:00:01.000+0000",
"state": "Failed",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"lineEnding": "LF",
"columnDelimiter": "COMMA",
"jobType": "V2Ingest",
"errorMessage": "InvalidJobState : Aborted by user"
})
.to_string();
let job: BulkIngestJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.state, BulkJobState::Failed);
assert_eq!(
job.error_message.as_deref(),
Some("InvalidJobState : Aborted by user")
);
}
#[test]
fn parses_bulk_query_job_response() {
let body = json!({
"id": "750xx",
"operation": "queryAll",
"state": "JobComplete",
"object": "Account",
"createdById": "005xx",
"createdDate": "2024-01-01T00:00:00.000+0000",
"systemModstamp": "2024-01-01T00:00:01.000+0000",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"lineEnding": "LF",
"columnDelimiter": "COMMA",
"jobType": "V2Query",
"numberRecordsProcessed": 5000,
"retries": 0,
"totalProcessingTime": 8000,
"isPkChunkingSupported": true
})
.to_string();
let job: BulkQueryJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.operation, BulkOperation::QueryAll);
assert_eq!(job.state, BulkJobState::JobComplete);
assert_eq!(job.object, "Account");
assert_eq!(job.job_type.as_deref(), Some("V2Query"));
assert_eq!(job.is_pk_chunking_supported, Some(true));
assert!(job.error_message.is_none());
}
#[test]
fn parses_bulk_query_job_create_response_without_jobtype() {
let body = json!({
"id": "750xx",
"operation": "query",
"object": "Account",
"createdById": "005xx",
"createdDate": "2024-01-01T00:00:00.000+0000",
"systemModstamp": "2024-01-01T00:00:00.000+0000",
"state": "UploadComplete",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"lineEnding": "LF",
"columnDelimiter": "COMMA"
})
.to_string();
let job: BulkQueryJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.state, BulkJobState::UploadComplete);
assert!(job.job_type.is_none());
assert!(job.number_records_processed.is_none());
assert!(job.is_pk_chunking_supported.is_none());
}
#[test]
fn parses_bulk_query_job_failed_with_error_message() {
let body = json!({
"id": "750xx",
"operation": "query",
"state": "Failed",
"object": "Account",
"createdById": "005xx",
"createdDate": "2024-01-01T00:00:00.000+0000",
"systemModstamp": "2024-01-01T00:00:01.000+0000",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 60.0,
"lineEnding": "LF",
"columnDelimiter": "COMMA",
"jobType": "V2Query",
"errorMessage": "MALFORMED_QUERY: unexpected token"
})
.to_string();
let job: BulkQueryJob = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(job.state, BulkJobState::Failed);
assert_eq!(
job.error_message.as_deref(),
Some("MALFORMED_QUERY: unexpected token")
);
}
#[test]
fn parses_composite_response_with_per_subrequest_results() {
let body = json!({
"compositeResponse": [
{
"body": {"id": "001RM000003oCprYAE", "success": true, "errors": []},
"httpHeaders": {"Location": "/services/data/v66.0/sobjects/Account/001RM000003oCprYAE"},
"httpStatusCode": 201,
"referenceId": "NewAccount"
},
{
"body": {"attributes": {"type": "Account"}, "Id": "001RM000003oCprYAE", "Name": "Acme"},
"httpHeaders": {},
"httpStatusCode": 200,
"referenceId": "AccountInfo"
},
{
"body": null,
"httpHeaders": {},
"httpStatusCode": 204,
"referenceId": "ContactPatch"
}
]
})
.to_string();
let resp: CompositeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert_eq!(resp.composite_response.len(), 3);
assert!(resp.composite_response.iter().all(|r| r.is_success()));
assert_eq!(resp.composite_response[0].reference_id, "NewAccount");
assert_eq!(
resp.composite_response[0]
.http_headers
.get("Location")
.and_then(|v| v.to_str().ok()),
Some("/services/data/v66.0/sobjects/Account/001RM000003oCprYAE")
);
assert_eq!(resp.composite_response[1].body["Name"], "Acme");
assert!(resp.composite_response[2].body.is_null());
}
#[test]
fn composite_subresponse_header_lookup_is_case_insensitive() {
let body = json!({
"compositeResponse": [{
"body": {"id": "001xx", "success": true, "errors": []},
"httpHeaders": {"Location": "/services/data/v66.0/sobjects/Account/001xx"},
"httpStatusCode": 201,
"referenceId": "x"
}]
})
.to_string();
let resp: CompositeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
let headers = &resp.composite_response[0].http_headers;
assert!(headers.get("Location").is_some());
assert!(headers.get("location").is_some());
assert!(headers.get("LOCATION").is_some());
assert_eq!(
headers.get("location").and_then(|v| v.to_str().ok()),
Some("/services/data/v66.0/sobjects/Account/001xx")
);
}
#[test]
fn parses_composite_subresponse_failure_body() {
let body = json!({
"compositeResponse": [{
"body": [{
"message": "The requested resource does not exist",
"errorCode": "NOT_FOUND"
}],
"httpHeaders": {},
"httpStatusCode": 404,
"referenceId": "Lookup"
}]
})
.to_string();
let resp: CompositeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
let sub = &resp.composite_response[0];
assert!(!sub.is_success());
assert_eq!(sub.http_status_code, 404);
assert_eq!(sub.body[0]["errorCode"], "NOT_FOUND");
}
#[test]
fn parses_composite_response_with_default_empty_when_absent() {
let body = r#"{}"#;
let resp: CompositeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(resp.composite_response.is_empty());
}
#[test]
fn parses_composite_tree_success_response() {
let body = json!({
"hasErrors": false,
"results": [
{"referenceId": "ref1", "id": "001D000000K0fXOIAZ"},
{"referenceId": "ref4", "id": "001D000000K0fXPIAZ"},
{"referenceId": "ref2", "id": "003D000000QV9n2IAD"},
{"referenceId": "ref3", "id": "003D000000QV9n3IAD"}
]
})
.to_string();
let resp: CompositeTreeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(!resp.has_errors);
assert_eq!(resp.results.len(), 4);
assert!(resp.results.iter().all(CompositeTreeResult::is_success));
assert_eq!(resp.results[0].reference_id, "ref1");
assert_eq!(resp.results[0].id.as_deref(), Some("001D000000K0fXOIAZ"));
assert!(resp.results[0].errors.is_none());
}
#[test]
fn parses_composite_tree_failure_response() {
let body = json!({
"hasErrors": true,
"results": [{
"referenceId": "ref2",
"errors": [{
"statusCode": "INVALID_EMAIL_ADDRESS",
"message": "Email: invalid email address: 123",
"fields": ["Email"]
}]
}]
})
.to_string();
let resp: CompositeTreeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
assert!(resp.has_errors);
assert_eq!(resp.results.len(), 1);
let result = &resp.results[0];
assert!(!result.is_success());
assert_eq!(result.reference_id, "ref2");
assert!(result.id.is_none());
let errors = result.errors.as_ref().unwrap();
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].status_code, "INVALID_EMAIL_ADDRESS");
assert_eq!(errors[0].fields, vec!["Email".to_string()]);
}
#[test]
fn parses_composite_tree_error_without_fields() {
let body = json!({
"hasErrors": true,
"results": [{
"referenceId": "ref1",
"errors": [{
"statusCode": "UNABLE_TO_LOCK_ROW",
"message": "unable to obtain exclusive access"
}]
}]
})
.to_string();
let resp: CompositeTreeResponse = parse_response_bytes(200, body.as_bytes()).unwrap();
let errors = resp.results[0].errors.as_ref().unwrap();
assert!(errors[0].fields.is_empty());
}
#[test]
fn parses_sobject_create_result() {
let body = r#"{"id":"001xx0000000001","success":true,"errors":[]}"#;
let parsed: SObjectCreateResult = parse_response_bytes(201, body.as_bytes()).unwrap();
assert_eq!(parsed.id, "001xx0000000001");
assert!(parsed.success);
assert!(parsed.errors.is_empty());
assert!(parsed.created.is_none());
}
#[test]
fn limit_info_parses_well_formed_header() {
let info = LimitInfo::parse("api-usage=42/15000").unwrap();
assert_eq!(info.used, 42);
assert_eq!(info.allowed, 15000);
assert_eq!(info.remaining(), 14958);
}
#[test]
fn limit_info_tolerates_whitespace_around_value() {
let info = LimitInfo::parse("api-usage= 42 / 15000 ").unwrap();
assert_eq!(info.used, 42);
assert_eq!(info.allowed, 15000);
}
#[test]
fn limit_info_returns_none_on_malformed_input() {
assert_eq!(LimitInfo::parse("foo=1/2"), None);
assert_eq!(LimitInfo::parse("api-usage=10"), None);
assert_eq!(LimitInfo::parse("api-usage=ten/fifteen"), None);
assert_eq!(LimitInfo::parse(""), None);
assert_eq!(LimitInfo::parse("api-usage=-5/100"), None);
}
#[test]
fn limit_info_remaining_saturates() {
let info = LimitInfo {
used: 100,
allowed: 50,
};
assert_eq!(info.remaining(), 0);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod property_tests {
use super::*;
use proptest::prelude::*;
use serde_json::Value;
proptest! {
#[test]
fn parse_response_bytes_never_panics_for_value(
status in 100u16..600,
bytes in proptest::collection::vec(any::<u8>(), 0..256),
) {
let _: Result<Value, _> = parse_response_bytes(status, &bytes);
}
#[test]
fn non_2xx_status_always_returns_err(
status in (100u16..200).prop_union(300u16..600),
bytes in proptest::collection::vec(any::<u8>(), 0..256),
) {
let result: Result<Value, _> = parse_response_bytes(status, &bytes);
prop_assert!(result.is_err(), "status {status} must yield Err");
}
}
}