force_sync/apply/
salesforce.rs1use force::{
4 api::RestOperation,
5 api::SoqlQueryBuilder,
6 api::bulk::{JobOperation, SmartIngestResult},
7 auth::Authenticator,
8 client::ForceClient,
9 error::{ForceError, HttpError},
10 types::DynamicSObject,
11 types::SalesforceId,
12};
13use futures::stream;
14use serde::Serialize;
15use serde_json::Value;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct RestApplyResult {
20 pub salesforce_id: Option<SalesforceId>,
22 pub created: bool,
24}
25
26#[derive(Debug, thiserror::Error)]
28pub enum ApplyError {
29 #[error("retryable Salesforce apply error: {0}")]
31 Retryable(ForceError),
32
33 #[error("permanent Salesforce apply error: {0}")]
35 Permanent(ForceError),
36}
37
38#[derive(Debug, Clone)]
40pub struct SalesforceApplier<A: Authenticator> {
41 client: ForceClient<A>,
42}
43
44impl<A: Authenticator> SalesforceApplier<A> {
45 #[must_use]
47 pub const fn new(client: ForceClient<A>) -> Self {
48 Self { client }
49 }
50
51 pub async fn apply_rest_upsert(
57 &self,
58 sobject: &str,
59 external_id_field: &str,
60 external_id_value: &str,
61 payload: &Value,
62 ) -> Result<RestApplyResult, ApplyError> {
63 let rest = self.client.rest();
64
65 match rest
66 .upsert_idempotent(sobject, external_id_field, external_id_value, payload)
67 .await
68 {
69 Ok(response) => Ok(RestApplyResult {
70 salesforce_id: Some(response.id),
71 created: response.created,
72 }),
73 Err(error) if is_missing_id_upsert_update(&error) => Ok(RestApplyResult {
74 salesforce_id: Some(
75 self.resolve_salesforce_id(sobject, external_id_field, external_id_value)
76 .await?,
77 ),
78 created: false,
79 }),
80 Err(error) => Err(classify_force_error(error)),
81 }
82 }
83
84 pub async fn apply_rest_delete(
90 &self,
91 sobject: &str,
92 salesforce_id: &SalesforceId,
93 ) -> Result<(), ApplyError> {
94 match self.client.rest().delete(sobject, salesforce_id).await {
95 Ok(_) => Ok(()),
96 Err(error) => {
97 if matches!(
98 error,
99 ForceError::Http(HttpError::StatusError {
100 status_code: 404,
101 ..
102 })
103 ) {
104 Ok(())
105 } else {
106 Err(classify_force_error(error))
107 }
108 }
109 }
110 }
111
112 pub async fn apply_bulk_upsert<T>(
118 &self,
119 sobject: &str,
120 external_id_field: &str,
121 batch_size: usize,
122 records: Vec<T>,
123 ) -> Result<SmartIngestResult, ApplyError>
124 where
125 T: Serialize + Send + Sync,
126 {
127 self.client
128 .bulk()
129 .smart_ingest(sobject, JobOperation::Upsert)
130 .external_id_field(external_id_field)
131 .batch_size(batch_size)
132 .execute_stream(stream::iter(records))
133 .await
134 .map_err(classify_force_error)
135 }
136
137 async fn resolve_salesforce_id(
138 &self,
139 sobject: &str,
140 external_id_field: &str,
141 external_id_value: &str,
142 ) -> Result<SalesforceId, ApplyError> {
143 let query = SoqlQueryBuilder::new()
144 .select(&["Id"])
145 .from(sobject)
146 .where_eq(external_id_field, external_id_value)
147 .limit(1)
148 .build();
149
150 let result = self
151 .client
152 .rest()
153 .query::<DynamicSObject>(&query)
154 .await
155 .map_err(classify_force_error)?;
156
157 let Some(record) = result.records.into_iter().next() else {
158 return Err(missing_follow_up_id_error(
159 sobject,
160 external_id_field,
161 external_id_value,
162 ));
163 };
164
165 let Some(id) = record.get_field_as::<String>("Id").map_err(|error| {
166 ApplyError::Permanent(ForceError::InvalidInput(format!(
167 "invalid Salesforce query result: {error}"
168 )))
169 })?
170 else {
171 return Err(missing_follow_up_id_error(
172 sobject,
173 external_id_field,
174 external_id_value,
175 ));
176 };
177
178 SalesforceId::new(&id).map_err(|error| {
179 ApplyError::Permanent(ForceError::InvalidInput(format!(
180 "invalid Salesforce query result ID: {error}"
181 )))
182 })
183 }
184}
185
186fn is_missing_id_upsert_update(error: &ForceError) -> bool {
187 matches!(
188 error,
189 ForceError::NotImplemented(message)
190 if message.contains("204") && message.contains("record ID")
191 )
192}
193
194const fn classify_force_error(error: ForceError) -> ApplyError {
195 if is_retryable_force_error(&error) {
196 ApplyError::Retryable(error)
197 } else {
198 ApplyError::Permanent(error)
199 }
200}
201
202fn missing_follow_up_id_error(
203 sobject: &str,
204 external_id_field: &str,
205 external_id_value: &str,
206) -> ApplyError {
207 ApplyError::Permanent(ForceError::InvalidInput(format!(
208 "upsert updated existing {sobject} via {external_id_field}={external_id_value}, but follow-up lookup did not return a Salesforce Id"
209 )))
210}
211
212const fn is_retryable_force_error(error: &ForceError) -> bool {
213 match error {
214 ForceError::Http(HttpError::StatusError { status_code, .. }) => {
215 *status_code == 408 || *status_code == 409 || *status_code == 429 || *status_code >= 500
216 }
217 ForceError::Http(
218 HttpError::RateLimitExceeded { .. }
219 | HttpError::Timeout { .. }
220 | HttpError::RequestFailed(_),
221 ) => true,
222 _ => false,
223 }
224}