Skip to main content

force_sync/apply/
salesforce.rs

1//! Salesforce REST apply helpers.
2
3use force::{
4    api::RestOperation,
5    api::SoqlQueryBuilder,
6    api::bulk::{JobOperation, SmartIngestResult},
7    auth::Authenticator,
8    client::ForceClient,
9    error::{ForceError, HttpError},
10    types::DynamicSObject,
11    types::SalesforceId,
12};
13use futures::stream;
14use serde::Serialize;
15use serde_json::Value;
16
17/// Result of applying a REST upsert to Salesforce.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct RestApplyResult {
20    /// Salesforce ID returned by the API, if available.
21    pub salesforce_id: Option<SalesforceId>,
22    /// Whether the upsert created a new record.
23    pub created: bool,
24}
25
26/// Error returned when a Salesforce apply operation fails.
27#[derive(Debug, thiserror::Error)]
28pub enum ApplyError {
29    /// A transient failure occurred and the operation can be retried.
30    #[error("retryable Salesforce apply error: {0}")]
31    Retryable(ForceError),
32
33    /// A non-retryable failure occurred.
34    #[error("permanent Salesforce apply error: {0}")]
35    Permanent(ForceError),
36}
37
38/// Minimal Salesforce REST applier for force-sync lanes.
39#[derive(Debug, Clone)]
40pub struct SalesforceApplier<A: Authenticator> {
41    client: ForceClient<A>,
42}
43
44impl<A: Authenticator> SalesforceApplier<A> {
45    /// Creates a new applier from an authenticated `force` client.
46    #[must_use]
47    pub const fn new(client: ForceClient<A>) -> Self {
48        Self { client }
49    }
50
51    /// Applies an external-ID upsert using the idempotent REST lane.
52    ///
53    /// # Errors
54    ///
55    /// Returns a retryable or permanent apply error if the request fails.
56    pub async fn apply_rest_upsert(
57        &self,
58        sobject: &str,
59        external_id_field: &str,
60        external_id_value: &str,
61        payload: &Value,
62    ) -> Result<RestApplyResult, ApplyError> {
63        let rest = self.client.rest();
64
65        match rest
66            .upsert_idempotent(sobject, external_id_field, external_id_value, payload)
67            .await
68        {
69            Ok(response) => Ok(RestApplyResult {
70                salesforce_id: Some(response.id),
71                created: response.created,
72            }),
73            Err(error) if is_missing_id_upsert_update(&error) => Ok(RestApplyResult {
74                salesforce_id: Some(
75                    self.resolve_salesforce_id(sobject, external_id_field, external_id_value)
76                        .await?,
77                ),
78                created: false,
79            }),
80            Err(error) => Err(classify_force_error(error)),
81        }
82    }
83
84    /// Applies a REST delete using a known Salesforce record ID.
85    ///
86    /// # Errors
87    ///
88    /// Returns a retryable or permanent apply error if the request fails.
89    pub async fn apply_rest_delete(
90        &self,
91        sobject: &str,
92        salesforce_id: &SalesforceId,
93    ) -> Result<(), ApplyError> {
94        match self.client.rest().delete(sobject, salesforce_id).await {
95            Ok(_) => Ok(()),
96            Err(error) => {
97                if matches!(
98                    error,
99                    ForceError::Http(HttpError::StatusError {
100                        status_code: 404,
101                        ..
102                    })
103                ) {
104                    Ok(())
105                } else {
106                    Err(classify_force_error(error))
107                }
108            }
109        }
110    }
111
112    /// Applies a bulk upsert using an external ID field and configured batch size.
113    ///
114    /// # Errors
115    ///
116    /// Returns a retryable or permanent apply error if the bulk job fails.
117    pub async fn apply_bulk_upsert<T>(
118        &self,
119        sobject: &str,
120        external_id_field: &str,
121        batch_size: usize,
122        records: Vec<T>,
123    ) -> Result<SmartIngestResult, ApplyError>
124    where
125        T: Serialize + Send + Sync,
126    {
127        self.client
128            .bulk()
129            .smart_ingest(sobject, JobOperation::Upsert)
130            .external_id_field(external_id_field)
131            .batch_size(batch_size)
132            .execute_stream(stream::iter(records))
133            .await
134            .map_err(classify_force_error)
135    }
136
137    async fn resolve_salesforce_id(
138        &self,
139        sobject: &str,
140        external_id_field: &str,
141        external_id_value: &str,
142    ) -> Result<SalesforceId, ApplyError> {
143        let query = SoqlQueryBuilder::new()
144            .select(&["Id"])
145            .from(sobject)
146            .where_eq(external_id_field, external_id_value)
147            .limit(1)
148            .build();
149
150        let result = self
151            .client
152            .rest()
153            .query::<DynamicSObject>(&query)
154            .await
155            .map_err(classify_force_error)?;
156
157        let Some(record) = result.records.into_iter().next() else {
158            return Err(missing_follow_up_id_error(
159                sobject,
160                external_id_field,
161                external_id_value,
162            ));
163        };
164
165        let Some(id) = record.get_field_as::<String>("Id").map_err(|error| {
166            ApplyError::Permanent(ForceError::InvalidInput(format!(
167                "invalid Salesforce query result: {error}"
168            )))
169        })?
170        else {
171            return Err(missing_follow_up_id_error(
172                sobject,
173                external_id_field,
174                external_id_value,
175            ));
176        };
177
178        SalesforceId::new(&id).map_err(|error| {
179            ApplyError::Permanent(ForceError::InvalidInput(format!(
180                "invalid Salesforce query result ID: {error}"
181            )))
182        })
183    }
184}
185
186fn is_missing_id_upsert_update(error: &ForceError) -> bool {
187    matches!(
188        error,
189        ForceError::NotImplemented(message)
190            if message.contains("204") && message.contains("record ID")
191    )
192}
193
194const fn classify_force_error(error: ForceError) -> ApplyError {
195    if is_retryable_force_error(&error) {
196        ApplyError::Retryable(error)
197    } else {
198        ApplyError::Permanent(error)
199    }
200}
201
202fn missing_follow_up_id_error(
203    sobject: &str,
204    external_id_field: &str,
205    external_id_value: &str,
206) -> ApplyError {
207    ApplyError::Permanent(ForceError::InvalidInput(format!(
208        "upsert updated existing {sobject} via {external_id_field}={external_id_value}, but follow-up lookup did not return a Salesforce Id"
209    )))
210}
211
212const fn is_retryable_force_error(error: &ForceError) -> bool {
213    match error {
214        ForceError::Http(HttpError::StatusError { status_code, .. }) => {
215            *status_code == 408 || *status_code == 409 || *status_code == 429 || *status_code >= 500
216        }
217        ForceError::Http(
218            HttpError::RateLimitExceeded { .. }
219            | HttpError::Timeout { .. }
220            | HttpError::RequestFailed(_),
221        ) => true,
222        _ => false,
223    }
224}