salesforce_client/lib.rs
1//! # Salesforce API Client - Enterprise Edition
2//!
3//! A production-grade, type-driven Salesforce REST API client library for Rust.
4//!
5//! ## Features
6//! - 🔐 **OAuth 2.0 Authentication**: Automatic token refresh
7//! - 📄 **Automatic Pagination**: Handle large result sets transparently
8//! - 🔁 **Retry Logic**: Exponential backoff for transient failures
9//! - ⚡ **Caching**: Reduce API calls with intelligent caching
10//! - 🚦 **Rate Limiting**: Respect Salesforce API limits
11//! - 📝 **CRUD Operations**: Create, Read, Update, Delete
12//! - 🏗️ **Query Builder**: Type-safe SOQL construction
13//! - 📊 **Comprehensive Logging**: Built-in tracing support
14//! - 🎯 **Type Safety**: Generic methods with compile-time guarantees
15//!
16//! ## Quick Start
17//! ```no_run
18//! use salesforce_client::{SalesforceClient, ClientConfig, SfError};
19//! use serde::{Deserialize, Serialize};
20//!
21//! #[derive(Debug, Clone, Deserialize, Serialize)]
22//! struct Account {
23//! #[serde(rename = "Id")]
24//! id: String,
25//! #[serde(rename = "Name")]
26//! name: String,
27//! }
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), SfError> {
31//! // Initialize with configuration
32//! let config = ClientConfig::new(
33//! "https://your-instance.salesforce.com",
34//! "your_access_token",
35//! );
36//!
37//! let client = SalesforceClient::new(config);
38//!
39//! // Query with automatic pagination and caching
40//! let accounts: Vec<Account> = client
41//! .query("SELECT Id, Name FROM Account LIMIT 10")
42//! .await?;
43//!
44//! Ok(())
45//! }
46//! ```
47
48// Module declarations
49pub mod auth;
50pub mod cache;
51pub mod crud;
52pub mod error;
53pub mod pagination;
54pub mod query_builder;
55pub mod rate_limit;
56pub mod retry;
57
58// Re-exports for convenience
59pub use auth::{AccessToken, OAuthCredentials, TokenManager};
60pub use cache::{CacheConfig, QueryCache};
61pub use crud::{InsertResponse, UpdateResponse, UpsertBuilder};
62pub use error::{SfError, SfResult};
63pub use pagination::{PaginatedQuery, QueryOptions};
64pub use query_builder::{CountQueryBuilder, QueryBuilder, SubqueryBuilder};
65pub use rate_limit::{RateLimitConfig, RateLimiter};
66pub use retry::RetryConfig;
67
68use serde::de::DeserializeOwned;
69use serde::Serialize;
70use std::sync::Arc;
71use tracing::{debug, info, instrument};
72
73/// Client configuration builder
74///
75/// Provides a fluent API for configuring the Salesforce client with all
76/// enterprise features.
77#[derive(Debug, Clone)]
78pub struct ClientConfig {
79 /// Base URL of the Salesforce instance
80 pub base_url: String,
81
82 /// Access token for authentication
83 pub access_token: String,
84
85 /// Retry configuration
86 pub retry_config: RetryConfig,
87
88 /// Cache configuration
89 pub cache_config: CacheConfig,
90
91 /// Rate limit configuration
92 pub rate_limit_config: RateLimitConfig,
93
94 /// Enable automatic pagination
95 pub auto_paginate: bool,
96}
97
98impl ClientConfig {
99 /// Create a new configuration with defaults
100 pub fn new(base_url: impl Into<String>, access_token: impl Into<String>) -> Self {
101 Self {
102 base_url: base_url.into(),
103 access_token: access_token.into(),
104 retry_config: RetryConfig::default(),
105 cache_config: CacheConfig::default(),
106 rate_limit_config: RateLimitConfig::default(),
107 auto_paginate: true,
108 }
109 }
110
111 /// Configure retry behavior
112 pub fn with_retry(mut self, config: RetryConfig) -> Self {
113 self.retry_config = config;
114 self
115 }
116
117 /// Configure caching
118 pub fn with_cache(mut self, config: CacheConfig) -> Self {
119 self.cache_config = config;
120 self
121 }
122
123 /// Configure rate limiting
124 pub fn with_rate_limit(mut self, config: RateLimitConfig) -> Self {
125 self.rate_limit_config = config;
126 self
127 }
128
129 /// Disable automatic pagination
130 pub fn no_pagination(mut self) -> Self {
131 self.auto_paginate = false;
132 self
133 }
134
135 /// Disable all optional features (for testing or simple use cases)
136 pub fn minimal() -> Self {
137 Self {
138 base_url: String::new(),
139 access_token: String::new(),
140 retry_config: RetryConfig::no_retry(),
141 cache_config: CacheConfig::disabled(),
142 rate_limit_config: RateLimitConfig::unlimited(),
143 auto_paginate: false,
144 }
145 }
146}
147
148/// Enterprise-grade Salesforce API client
149///
150/// This client provides comprehensive features for production use:
151/// - Automatic token refresh via OAuth
152/// - Intelligent caching to reduce API calls
153/// - Retry logic with exponential backoff
154/// - Rate limiting to respect API quotas
155/// - Automatic pagination for large result sets
156/// - Full CRUD operations
157/// - Type-safe query building
158///
159/// ## Design Principles
160/// - **Enterprise-ready**: Built-in retry, caching, and rate limiting
161/// - **Type-safe**: Generic methods with compile-time guarantees
162/// - **Async-first**: Non-blocking I/O throughout
163/// - **Observable**: Comprehensive tracing for debugging
164/// - **Composable**: Arc-based sharing for concurrent use
165#[derive(Clone)]
166pub struct SalesforceClient {
167 /// Configuration
168 config: Arc<ClientConfig>,
169
170 /// HTTP client with connection pooling
171 http_client: reqwest::Client,
172
173 /// Query result cache
174 query_cache: Arc<QueryCache>,
175
176 /// Rate limiter
177 rate_limiter: Arc<RateLimiter>,
178
179 /// CRUD operations handler
180 crud: Arc<crud::CrudOperations>,
181}
182
183impl SalesforceClient {
184 /// Creates a new Salesforce API client with the given configuration
185 ///
186 /// # Example
187 /// ```no_run
188 /// use salesforce_client::{SalesforceClient, ClientConfig};
189 ///
190 /// let config = ClientConfig::new(
191 /// "https://yourinstance.salesforce.com",
192 /// "00D... your token",
193 /// );
194 ///
195 /// let client = SalesforceClient::new(config);
196 /// ```
197 pub fn new(config: ClientConfig) -> Self {
198 let http_client = reqwest::Client::new();
199 let query_cache = Arc::new(QueryCache::new(config.cache_config.clone()));
200 let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit_config.clone()));
201
202 let crud = Arc::new(crud::CrudOperations::new(
203 http_client.clone(),
204 config.base_url.clone(),
205 config.access_token.clone(),
206 ));
207
208 info!(
209 "Salesforce client initialized with base URL: {}",
210 config.base_url
211 );
212
213 Self {
214 config: Arc::new(config),
215 http_client,
216 query_cache,
217 rate_limiter,
218 crud,
219 }
220 }
221
222 /// Create a client with OAuth credentials (automatic token refresh)
223 ///
224 /// # Example
225 /// ```no_run
226 /// use salesforce_client::{SalesforceClient, OAuthCredentials};
227 ///
228 /// let credentials = OAuthCredentials {
229 /// client_id: "your_client_id".to_string(),
230 /// client_secret: "your_client_secret".to_string(),
231 /// refresh_token: Some("your_refresh_token".to_string()),
232 /// username: None,
233 /// password: None,
234 /// };
235 ///
236 /// // This will be implemented with TokenManager integration
237 /// // let client = SalesforceClient::with_oauth(credentials).await?;
238 /// ```
239 pub async fn with_oauth(credentials: OAuthCredentials) -> SfResult<Self> {
240 let token_manager = TokenManager::new(credentials);
241 let token = token_manager.get_token().await?;
242
243 let config = ClientConfig::new(token.instance_url(), token.token());
244
245 Ok(Self::new(config))
246 }
247
248 /// Execute a SOQL query with caching, retry, and rate limiting
249 ///
250 /// This method automatically handles:
251 /// - Rate limiting (waits if limit exceeded)
252 /// - Caching (returns cached results if available)
253 /// - Retry logic (retries transient failures)
254 /// - Pagination (fetches only first page by default)
255 ///
256 /// # Example
257 /// ```no_run
258 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
259 /// # use serde::{Deserialize, Serialize};
260 /// # #[derive(Debug, Clone, Deserialize, Serialize)]
261 /// # struct Account { #[serde(rename = "Id")] id: String }
262 /// # async fn example() -> Result<(), SfError> {
263 /// # let config = ClientConfig::new("https://example.com", "token");
264 /// # let client = SalesforceClient::new(config);
265 /// let accounts: Vec<Account> = client
266 /// .query("SELECT Id, Name FROM Account WHERE AnnualRevenue > 1000000")
267 /// .await?;
268 /// # Ok(())
269 /// # }
270 /// ```
271 #[instrument(skip(self, soql))]
272 pub async fn query<T>(&self, soql: impl AsRef<str>) -> SfResult<Vec<T>>
273 where
274 T: DeserializeOwned + Serialize + Clone,
275 {
276 let query_str = soql.as_ref();
277
278 // Check cache first
279 if let Some(cached) = self.query_cache.get::<T>(query_str).await {
280 debug!("Returning cached query results");
281 return Ok(cached);
282 }
283
284 // Apply rate limiting
285 self.rate_limiter.acquire().await?;
286
287 // Execute query with retry logic
288 let result = retry::with_retry(&self.config.retry_config, || async {
289 self.execute_query(query_str).await
290 })
291 .await?;
292
293 // Cache the results (clone only if T is Clone, otherwise skip caching)
294 // Note: We require T: Clone for caching
295 if let Ok(()) = self.query_cache.set(query_str, result.clone()).await {
296 // Cached successfully
297 }
298
299 Ok(result)
300 }
301
302 /// Execute query without caching (internal method)
303 async fn execute_query<T>(&self, soql: &str) -> SfResult<Vec<T>>
304 where
305 T: DeserializeOwned,
306 {
307 let url = format!("{}/services/data/v57.0/query", self.config.base_url);
308
309 debug!("Executing SOQL query");
310
311 let response = self
312 .http_client
313 .get(&url)
314 .query(&[("q", soql)])
315 .header(
316 "Authorization",
317 format!("Bearer {}", self.config.access_token),
318 )
319 .send()
320 .await?;
321
322 let status = response.status();
323 if !status.is_success() {
324 // Check for rate limit before consuming response body
325 let retry_after = if status.as_u16() == 429 {
326 response
327 .headers()
328 .get("Retry-After")
329 .and_then(|v| v.to_str().ok())
330 .and_then(|s| s.parse().ok())
331 } else {
332 None
333 };
334
335 let body = response.text().await.unwrap_or_default();
336
337 if status.as_u16() == 429 {
338 return Err(SfError::RateLimit { retry_after });
339 }
340
341 return Err(SfError::Api {
342 status: status.as_u16(),
343 body,
344 });
345 }
346
347 let query_response: pagination::QueryResponse<T> = response.json().await?;
348
349 info!("Query returned {} records", query_response.records.len());
350 Ok(query_response.records)
351 }
352
353 /// Query with automatic pagination - fetches ALL results
354 ///
355 /// **Warning**: This can consume significant memory for large result sets.
356 /// For queries returning >100k records, consider using `query_paginated` instead.
357 ///
358 /// # Example
359 /// ```no_run
360 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
361 /// # use serde::{Deserialize, Serialize};
362 /// # #[derive(Debug, Clone, Deserialize, Serialize)]
363 /// # struct Account { #[serde(rename = "Id")] id: String }
364 /// # async fn example() -> Result<(), SfError> {
365 /// # let config = ClientConfig::new("https://example.com", "token");
366 /// # let client = SalesforceClient::new(config);
367 /// // Fetches all accounts, automatically handling pagination
368 /// let accounts: Vec<Account> = client
369 /// .query_all("SELECT Id, Name FROM Account")
370 /// .await?;
371 /// # Ok(())
372 /// # }
373 /// ```
374 #[instrument(skip(self, soql))]
375 pub async fn query_all<T>(&self, soql: impl AsRef<str>) -> SfResult<Vec<T>>
376 where
377 T: DeserializeOwned + Serialize,
378 {
379 info!("Executing query with full pagination");
380
381 let mut all_records = Vec::new();
382 let mut pages = self.query_paginated::<T>(soql.as_ref()).await?;
383
384 while let Some(batch) = pages.next().await? {
385 all_records.extend(batch);
386 }
387
388 info!("Collected {} total records", all_records.len());
389 Ok(all_records)
390 }
391
392 /// Get a paginated query iterator for manual pagination control
393 ///
394 /// This is the most memory-efficient way to handle large result sets.
395 ///
396 /// # Example
397 /// ```no_run
398 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
399 /// # use serde::{Deserialize, Serialize};
400 /// # #[derive(Debug, Clone, Deserialize, Serialize)]
401 /// # struct Account { #[serde(rename = "Id")] id: String }
402 /// # async fn example() -> Result<(), SfError> {
403 /// # let config = ClientConfig::new("https://example.com", "token");
404 /// # let client = SalesforceClient::new(config);
405 /// let mut pages = client
406 /// .query_paginated::<Account>("SELECT Id FROM Account")
407 /// .await?;
408 ///
409 /// while let Some(batch) = pages.next().await? {
410 /// for account in batch {
411 /// println!("{:?}", account);
412 /// }
413 /// }
414 /// # Ok(())
415 /// # }
416 /// ```
417 pub async fn query_paginated<T>(&self, soql: &str) -> SfResult<PaginatedQuery<T>>
418 where
419 T: DeserializeOwned,
420 {
421 // Execute first query to get initial results and nextRecordsUrl
422 let url = format!("{}/services/data/v57.0/query", self.config.base_url);
423
424 self.rate_limiter.acquire().await?;
425
426 let response = self
427 .http_client
428 .get(&url)
429 .query(&[("q", soql)])
430 .header(
431 "Authorization",
432 format!("Bearer {}", self.config.access_token),
433 )
434 .send()
435 .await?;
436
437 let status = response.status();
438 if !status.is_success() {
439 let body = response.text().await.unwrap_or_default();
440 return Err(SfError::Api {
441 status: status.as_u16(),
442 body,
443 });
444 }
445
446 let query_response: pagination::QueryResponse<T> = response.json().await?;
447 let next_url = query_response.next_records_url.clone();
448
449 Ok(PaginatedQuery::new(
450 self.http_client.clone(),
451 self.config.base_url.clone(),
452 self.config.access_token.clone(),
453 next_url,
454 ))
455 }
456
457 /// Insert a new record
458 ///
459 /// # Example
460 /// ```no_run
461 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
462 /// # use serde::Serialize;
463 /// #[derive(Serialize)]
464 /// struct NewAccount {
465 /// #[serde(rename = "Name")]
466 /// name: String,
467 /// #[serde(rename = "Industry")]
468 /// industry: String,
469 /// }
470 /// # async fn example() -> Result<(), SfError> {
471 /// # let config = ClientConfig::new("https://example.com", "token");
472 /// # let client = SalesforceClient::new(config);
473 ///
474 /// let account = NewAccount {
475 /// name: "Acme Corporation".to_string(),
476 /// industry: "Technology".to_string(),
477 /// };
478 ///
479 /// let response = client.insert("Account", &account).await?;
480 /// println!("Created account with ID: {}", response.id);
481 /// # Ok(())
482 /// # }
483 /// ```
484 #[instrument(skip(self, data))]
485 pub async fn insert<T: Serialize>(&self, sobject: &str, data: &T) -> SfResult<InsertResponse> {
486 self.rate_limiter.acquire().await?;
487
488 retry::with_retry(&self.config.retry_config, || async {
489 self.crud.insert(sobject, data).await
490 })
491 .await
492 }
493
494 /// Update an existing record
495 ///
496 /// # Example
497 /// ```no_run
498 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
499 /// # use serde::Serialize;
500 /// #[derive(Serialize)]
501 /// struct AccountUpdate {
502 /// #[serde(rename = "Name")]
503 /// name: String,
504 /// }
505 /// # async fn example() -> Result<(), SfError> {
506 /// # let config = ClientConfig::new("https://example.com", "token");
507 /// # let client = SalesforceClient::new(config);
508 ///
509 /// let update = AccountUpdate {
510 /// name: "Acme Corp (Updated)".to_string(),
511 /// };
512 ///
513 /// client.update("Account", "001xx000003DGbX", &update).await?;
514 /// println!("Account updated successfully");
515 /// # Ok(())
516 /// # }
517 /// ```
518 #[instrument(skip(self, data))]
519 pub async fn update<T: Serialize>(&self, sobject: &str, id: &str, data: &T) -> SfResult<()> {
520 self.rate_limiter.acquire().await?;
521
522 retry::with_retry(&self.config.retry_config, || async {
523 self.crud.update(sobject, id, data).await
524 })
525 .await?;
526
527 // Invalidate cache for this record
528 self.query_cache.clear().await;
529
530 Ok(())
531 }
532
533 /// Delete a record
534 ///
535 /// # Example
536 /// ```no_run
537 /// # use salesforce_client::{SalesforceClient, ClientConfig, SfError};
538 /// # async fn example() -> Result<(), SfError> {
539 /// # let config = ClientConfig::new("https://example.com", "token");
540 /// # let client = SalesforceClient::new(config);
541 ///
542 /// client.delete("Account", "001xx000003DGbX").await?;
543 /// println!("Account deleted successfully");
544 /// # Ok(())
545 /// # }
546 /// ```
547 #[instrument(skip(self))]
548 pub async fn delete(&self, sobject: &str, id: &str) -> SfResult<()> {
549 self.rate_limiter.acquire().await?;
550
551 retry::with_retry(&self.config.retry_config, || async {
552 self.crud.delete(sobject, id).await
553 })
554 .await?;
555
556 // Invalidate cache
557 self.query_cache.clear().await;
558
559 Ok(())
560 }
561
562 /// Upsert a record (insert or update based on external ID)
563 ///
564 /// # Example
565 /// ```no_run
566 /// # use salesforce_client::{SalesforceClient, ClientConfig, UpsertBuilder, SfError};
567 /// # use serde::Serialize;
568 /// #[derive(Serialize)]
569 /// struct Account {
570 /// #[serde(rename = "Name")]
571 /// name: String,
572 /// }
573 /// # async fn example() -> Result<(), SfError> {
574 /// # let config = ClientConfig::new("https://example.com", "token");
575 /// # let client = SalesforceClient::new(config);
576 ///
577 /// let account = Account {
578 /// name: "Acme Corporation".to_string(),
579 /// };
580 ///
581 /// let upsert = UpsertBuilder::new("External_Id__c", "EXT-12345");
582 /// let response = client.upsert("Account", upsert, &account).await?;
583 /// println!("Upserted account with ID: {}", response.id);
584 /// # Ok(())
585 /// # }
586 /// ```
587 #[instrument(skip(self, data))]
588 pub async fn upsert<T: Serialize>(
589 &self,
590 sobject: &str,
591 builder: UpsertBuilder,
592 data: &T,
593 ) -> SfResult<InsertResponse> {
594 self.rate_limiter.acquire().await?;
595
596 let result = retry::with_retry(&self.config.retry_config, || async {
597 self.crud.upsert(sobject, builder.clone(), data).await
598 })
599 .await?;
600
601 // Invalidate cache
602 self.query_cache.clear().await;
603
604 Ok(result)
605 }
606
607 // ========================================================================
608 // Utility Methods
609 // ========================================================================
610
611 /// Clear the query cache
612 pub async fn clear_cache(&self) {
613 self.query_cache.clear().await;
614 info!("Cache cleared");
615 }
616
617 /// Get the current configuration
618 pub fn config(&self) -> &ClientConfig {
619 &self.config
620 }
621
622 /// Get rate limiter status
623 pub fn rate_limit_status(&self) -> rate_limit::RateLimitStatus {
624 self.rate_limiter.status()
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631
632 #[test]
633 fn test_client_config_builder() {
634 let config = ClientConfig::new("https://test.salesforce.com", "test_token")
635 .with_cache(CacheConfig::disabled())
636 .no_pagination();
637
638 assert_eq!(config.base_url, "https://test.salesforce.com");
639 assert!(!config.auto_paginate);
640 }
641
642 #[test]
643 fn test_client_creation() {
644 let config = ClientConfig::new("https://test.salesforce.com", "test_token");
645
646 let client = SalesforceClient::new(config);
647 assert_eq!(client.config.base_url, "https://test.salesforce.com");
648 }
649}