busbar-sf-client 0.0.3

Core HTTP client infrastructure for Salesforce APIs with retry, compression, and rate limiting
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! HTTP response handling with Salesforce-specific extensions.

use serde::de::DeserializeOwned;
use std::time::Duration;

use crate::error::{Error, ErrorKind, Result};

/// Wrapper around reqwest::Response with additional functionality.
#[derive(Debug)]
pub struct Response {
    inner: reqwest::Response,
}

impl Response {
    /// Create a new Response from a reqwest::Response.
    pub(crate) fn new(inner: reqwest::Response) -> Self {
        Self { inner }
    }

    /// Get the HTTP status code.
    pub fn status(&self) -> u16 {
        self.inner.status().as_u16()
    }

    /// Returns true if the response status is successful (2xx).
    pub fn is_success(&self) -> bool {
        self.inner.status().is_success()
    }

    /// Returns true if this is a 304 Not Modified response.
    pub fn is_not_modified(&self) -> bool {
        self.inner.status().as_u16() == 304
    }

    /// Get a header value.
    pub fn header(&self, name: &str) -> Option<&str> {
        self.inner.headers().get(name)?.to_str().ok()
    }

    /// Get the ETag header value.
    pub fn etag(&self) -> Option<&str> {
        self.header("etag")
    }

    /// Get the Last-Modified header value.
    pub fn last_modified(&self) -> Option<&str> {
        self.header("last-modified")
    }

    /// Get the Retry-After header as a Duration.
    pub fn retry_after(&self) -> Option<Duration> {
        let value = self.header("retry-after")?;

        // Try parsing as seconds first
        if let Ok(seconds) = value.parse::<u64>() {
            return Some(Duration::from_secs(seconds));
        }

        // Try parsing as HTTP date (simplified - just extract seconds from now)
        // In practice, most Salesforce Retry-After headers are in seconds
        None
    }

    /// Get the Sforce-Locator header (used for Bulk API pagination).
    pub fn sforce_locator(&self) -> Option<&str> {
        self.header("sforce-locator")
    }

    /// Get the Content-Type header.
    pub fn content_type(&self) -> Option<&str> {
        self.header("content-type")
    }

    /// Get the response body as text.
    pub async fn text(self) -> Result<String> {
        self.inner.text().await.map_err(Into::into)
    }

    /// Get the response body as bytes.
    pub async fn bytes(self) -> Result<bytes::Bytes> {
        self.inner.bytes().await.map_err(Into::into)
    }

    /// Deserialize the response body as JSON.
    pub async fn json<T: DeserializeOwned>(self) -> Result<T> {
        self.inner.json().await.map_err(Into::into)
    }

    /// Get access to the inner reqwest::Response.
    pub fn into_inner(self) -> reqwest::Response {
        self.inner
    }

    /// Get API usage limits from response headers.
    pub fn api_usage(&self) -> Option<ApiUsage> {
        // Salesforce returns usage in Sforce-Limit-Info header
        // Format: "api-usage=25/15000"
        let info = self.header("sforce-limit-info")?;

        for part in info.split(',') {
            let part = part.trim();
            if part.starts_with("api-usage=") {
                let usage = part.trim_start_matches("api-usage=");
                let parts: Vec<&str> = usage.split('/').collect();
                if parts.len() == 2 {
                    let used = parts[0].parse().ok()?;
                    let limit = parts[1].parse().ok()?;
                    return Some(ApiUsage { used, limit });
                }
            }
        }

        None
    }
}

/// API usage information from response headers.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ApiUsage {
    /// Number of API calls used.
    pub used: u64,
    /// Total API call limit.
    pub limit: u64,
}

impl ApiUsage {
    /// Get the remaining API calls.
    pub fn remaining(&self) -> u64 {
        self.limit.saturating_sub(self.used)
    }

    /// Get the usage percentage.
    pub fn percentage(&self) -> f64 {
        if self.limit == 0 {
            100.0
        } else {
            (self.used as f64 / self.limit as f64) * 100.0
        }
    }

    /// Returns true if API usage is above the given percentage threshold.
    pub fn is_above_threshold(&self, threshold_percent: f64) -> bool {
        self.percentage() >= threshold_percent
    }
}

/// Extension trait for processing Salesforce API responses.
pub trait ResponseExt {
    /// Check for Salesforce API errors and convert to appropriate error type.
    fn check_salesforce_error(self) -> impl std::future::Future<Output = Result<Response>> + Send;
}

impl ResponseExt for Response {
    async fn check_salesforce_error(self) -> Result<Response> {
        let status = self.status();

        if self.is_success() || self.is_not_modified() {
            return Ok(self);
        }

        // Try to parse Salesforce error response
        let body = self.text().await.unwrap_or_default();

        // Check for rate limiting
        if status == 429 {
            // Try to extract retry-after from the response we already consumed
            // In practice, we should check this before consuming the body
            return Err(Error::new(ErrorKind::RateLimited { retry_after: None }));
        }

        // Try to parse as Salesforce error JSON
        if let Ok(errors) = serde_json::from_str::<Vec<SalesforceErrorResponse>>(&body) {
            if let Some(err) = errors.into_iter().next() {
                return Err(Error::new(ErrorKind::SalesforceApi {
                    error_code: err.error_code,
                    message: sanitize_error_message(&err.message),
                    fields: err.fields.unwrap_or_default(),
                }));
            }
        }

        // Try to parse as single error object
        if let Ok(err) = serde_json::from_str::<SalesforceErrorResponse>(&body) {
            return Err(Error::new(ErrorKind::SalesforceApi {
                error_code: err.error_code,
                message: sanitize_error_message(&err.message),
                fields: err.fields.unwrap_or_default(),
            }));
        }

        // Map status codes to error kinds - use sanitized messages to avoid
        // potentially exposing sensitive data from response bodies
        let sanitized = sanitize_error_message(&body);
        let kind = match status {
            401 => ErrorKind::Authentication(sanitized),
            403 => ErrorKind::Authorization(sanitized),
            404 => ErrorKind::NotFound(sanitized),
            412 => ErrorKind::PreconditionFailed(sanitized),
            _ => ErrorKind::Http {
                status,
                message: sanitized,
            },
        };

        Err(Error::new(kind))
    }
}

/// Sanitize an error message to prevent exposing sensitive data.
///
/// This function:
/// - Truncates messages longer than 500 characters
/// - Removes potential tokens (anything that looks like an access token)
/// - Removes potential session IDs
fn sanitize_error_message(message: &str) -> String {
    const MAX_LENGTH: usize = 500;

    let mut sanitized = message.to_string();

    // Remove anything that looks like a Bearer token or access token
    // Salesforce tokens typically start with "00D" and are 100+ chars
    let token_pattern = regex_lite::Regex::new(r"00[A-Za-z0-9]{13,}[!][A-Za-z0-9_.]+").unwrap();
    sanitized = token_pattern
        .replace_all(&sanitized, "[REDACTED_TOKEN]")
        .to_string();

    // Remove session IDs (typically 24 chars alphanumeric)
    let session_pattern = regex_lite::Regex::new(r"sid=[A-Za-z0-9]{20,}").unwrap();
    sanitized = session_pattern
        .replace_all(&sanitized, "sid=[REDACTED]")
        .to_string();

    // Truncate if too long
    if sanitized.len() > MAX_LENGTH {
        sanitized.truncate(MAX_LENGTH);
        sanitized.push_str("...[truncated]");
    }

    sanitized
}

/// Salesforce API error response format.
#[derive(Debug, serde::Deserialize)]
struct SalesforceErrorResponse {
    #[serde(alias = "errorCode")]
    error_code: String,
    message: String,
    fields: Option<Vec<String>>,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_api_usage() {
        let usage = ApiUsage {
            used: 100,
            limit: 1000,
        };

        assert_eq!(usage.remaining(), 900);
        assert!((usage.percentage() - 10.0).abs() < 0.001);
        assert!(!usage.is_above_threshold(50.0));
        assert!(usage.is_above_threshold(5.0));
    }

    #[test]
    fn test_api_usage_edge_cases() {
        let usage = ApiUsage {
            used: 1000,
            limit: 1000,
        };
        assert_eq!(usage.remaining(), 0);
        assert!((usage.percentage() - 100.0).abs() < 0.001);

        let usage = ApiUsage { used: 0, limit: 0 };
        assert_eq!(usage.remaining(), 0);
        assert!((usage.percentage() - 100.0).abs() < 0.001);
    }

    // =========================================================================
    // sanitize_error_message tests
    // =========================================================================

    #[test]
    fn test_sanitize_redacts_access_tokens() {
        // Salesforce access tokens start with "00D" (org ID) followed by 13+ chars, "!", then more chars
        let msg = "Session expired: 00Dxx0000001gEF!AQcAQH3k9s7LKbp_example_token_value.here";
        let sanitized = sanitize_error_message(msg);
        assert!(
            sanitized.contains("[REDACTED_TOKEN]"),
            "Should redact token: {sanitized}"
        );
        assert!(
            !sanitized.contains("AQcAQH3k9s7LKbp"),
            "Should not contain token value: {sanitized}"
        );
    }

    #[test]
    fn test_sanitize_redacts_session_ids() {
        let msg = "Invalid session: sid=abc123def456ghi789jkl012";
        let sanitized = sanitize_error_message(msg);
        assert!(
            sanitized.contains("sid=[REDACTED]"),
            "Should redact session ID: {sanitized}"
        );
        assert!(
            !sanitized.contains("abc123def456"),
            "Should not contain session ID value: {sanitized}"
        );
    }

    #[test]
    fn test_sanitize_truncates_long_messages() {
        let long_msg = "x".repeat(600);
        let sanitized = sanitize_error_message(&long_msg);
        assert!(
            sanitized.len() < 600,
            "Should be truncated: len={}",
            sanitized.len()
        );
        assert!(
            sanitized.ends_with("...[truncated]"),
            "Should end with truncation marker: {sanitized}"
        );
    }

    #[test]
    fn test_sanitize_passes_through_clean_messages() {
        let msg = "No such column 'foo' on entity 'Account'";
        assert_eq!(sanitize_error_message(msg), msg);
    }

    #[test]
    fn test_sanitize_redacts_multiple_tokens() {
        let msg = "Token1: 00Dxx0000001gEF!token1_value and Token2: 00Dyy0000002gEF!token2_value";
        let sanitized = sanitize_error_message(msg);
        // Both tokens should be redacted
        assert!(
            !sanitized.contains("token1_value"),
            "Should redact first token"
        );
        assert!(
            !sanitized.contains("token2_value"),
            "Should redact second token"
        );
    }

    // =========================================================================
    // SalesforceErrorResponse deserialization tests
    // =========================================================================

    #[test]
    fn test_salesforce_error_response_array_format() {
        let json = r#"[{"errorCode":"INVALID_FIELD","message":"No such column","fields":["Foo"]}]"#;
        let errors: Vec<SalesforceErrorResponse> = serde_json::from_str(json).unwrap();
        assert_eq!(errors.len(), 1);
        assert_eq!(errors[0].error_code, "INVALID_FIELD");
        assert_eq!(errors[0].message, "No such column");
        assert_eq!(errors[0].fields, Some(vec!["Foo".to_string()]));
    }

    #[test]
    fn test_salesforce_error_response_single_object() {
        let json = r#"{"errorCode":"NOT_FOUND","message":"The requested resource does not exist"}"#;
        let err: SalesforceErrorResponse = serde_json::from_str(json).unwrap();
        assert_eq!(err.error_code, "NOT_FOUND");
        assert_eq!(err.message, "The requested resource does not exist");
        assert!(err.fields.is_none());
    }

    #[test]
    fn test_salesforce_error_response_with_error_code_alias() {
        // Salesforce sometimes uses "errorCode" (camelCase)
        let json = r#"{"errorCode":"MALFORMED_QUERY","message":"unexpected token"}"#;
        let err: SalesforceErrorResponse = serde_json::from_str(json).unwrap();
        assert_eq!(err.error_code, "MALFORMED_QUERY");
    }

    #[test]
    fn test_salesforce_error_response_empty_array() {
        let json = "[]";
        let errors: Vec<SalesforceErrorResponse> = serde_json::from_str(json).unwrap();
        assert!(errors.is_empty());
    }

    #[test]
    fn test_salesforce_error_response_multiple_errors() {
        let json = r#"[
            {"errorCode":"REQUIRED_FIELD_MISSING","message":"Required fields missing","fields":["Name","Email"]},
            {"errorCode":"FIELD_CUSTOM_VALIDATION_EXCEPTION","message":"Must be positive"}
        ]"#;
        let errors: Vec<SalesforceErrorResponse> = serde_json::from_str(json).unwrap();
        assert_eq!(errors.len(), 2);
        assert_eq!(
            errors[0].fields,
            Some(vec!["Name".to_string(), "Email".to_string()])
        );
        assert!(errors[1].fields.is_none());
    }
}