Skip to main content

internetarchive_rs/
workflow.rs

1//! Higher-level item publication and update workflows.
2
3use crate::client::InternetArchiveClient;
4use crate::error::InternetArchiveError;
5use crate::metadata::{metadata_contains_projection, ItemMetadata};
6use crate::model::Item;
7use crate::upload::{FileConflictPolicy, UploadOptions, UploadSpec};
8use crate::ItemIdentifier;
9
10/// Request used by high-level publish and upsert helpers.
11#[derive(Clone, Debug, PartialEq)]
12pub struct PublishRequest {
13    /// Item identifier to create or update.
14    pub identifier: ItemIdentifier,
15    /// Desired metadata document.
16    pub metadata: ItemMetadata,
17    /// Files to upload.
18    pub uploads: Vec<UploadSpec>,
19    /// Conflict policy for uploads targeting existing names.
20    pub conflict_policy: FileConflictPolicy,
21    /// Per-upload options.
22    pub upload_options: UploadOptions,
23}
24
25impl PublishRequest {
26    /// Creates a new publish request with default overwrite behavior.
27    #[must_use]
28    pub fn new(
29        identifier: ItemIdentifier,
30        metadata: ItemMetadata,
31        uploads: Vec<UploadSpec>,
32    ) -> Self {
33        Self {
34            identifier,
35            metadata,
36            uploads,
37            conflict_policy: FileConflictPolicy::Overwrite,
38            upload_options: UploadOptions::default(),
39        }
40    }
41}
42
43/// Result returned by high-level publish or upsert helpers.
44#[derive(Clone, Debug, PartialEq)]
45pub struct PublishOutcome {
46    /// Final item state after the workflow.
47    pub item: Item,
48    /// Whether the item was created during this workflow.
49    pub created: bool,
50    /// File names uploaded during this workflow.
51    pub uploaded_files: Vec<String>,
52    /// File names skipped because of the selected policy.
53    pub skipped_files: Vec<String>,
54    /// Whether metadata was updated through MDAPI after the upload step.
55    pub metadata_changed: bool,
56}
57
58impl InternetArchiveClient {
59    /// Creates a brand-new item and uploads all requested files.
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if the identifier is not valid for IA-S3 bucket
64    /// creation, the item already exists, the request has no files, or any
65    /// network step fails.
66    pub async fn publish_item(
67        &self,
68        request: PublishRequest,
69    ) -> Result<PublishOutcome, InternetArchiveError> {
70        request.identifier.validate_for_bucket_creation()?;
71
72        match self.get_item(&request.identifier).await {
73            Ok(_) => Err(InternetArchiveError::InvalidState(format!(
74                "item {} already exists",
75                request.identifier
76            ))),
77            Err(InternetArchiveError::ItemNotFound { .. }) => {
78                self.create_or_update_item(request, None, true).await
79            }
80            Err(error) => Err(error),
81        }
82    }
83
84    /// Creates or updates an item using the provided upload conflict policy.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the identifier is not valid for IA-S3 bucket
89    /// creation when a new item must be created, or if any required network
90    /// step fails.
91    pub async fn upsert_item(
92        &self,
93        request: PublishRequest,
94    ) -> Result<PublishOutcome, InternetArchiveError> {
95        let existing = match self.get_item(&request.identifier).await {
96            Ok(item) => Some(item),
97            Err(InternetArchiveError::ItemNotFound { .. }) => None,
98            Err(error) => return Err(error),
99        };
100        self.create_or_update_item(request, existing, false).await
101    }
102
103    async fn create_or_update_item(
104        &self,
105        request: PublishRequest,
106        existing: Option<Item>,
107        must_create: bool,
108    ) -> Result<PublishOutcome, InternetArchiveError> {
109        if request.uploads.is_empty() {
110            return Err(InternetArchiveError::InvalidState(
111                "Internet Archive item workflows require at least one upload".to_owned(),
112            ));
113        }
114
115        let created = existing.is_none();
116
117        if must_create && existing.is_some() {
118            return Err(InternetArchiveError::InvalidState(format!(
119                "item {} already exists",
120                request.identifier
121            )));
122        }
123
124        let mut uploaded_files = Vec::new();
125        let mut skipped_files = Vec::new();
126        let mut metadata_changed;
127
128        if let Some(existing) = existing.as_ref() {
129            for spec in &request.uploads {
130                let already_present = existing.file(&spec.filename).is_some();
131                match (already_present, request.conflict_policy) {
132                    (true, FileConflictPolicy::Error) => {
133                        return Err(InternetArchiveError::UploadConflict {
134                            filename: spec.filename.clone(),
135                        });
136                    }
137                    (true, FileConflictPolicy::Skip) => {
138                        skipped_files.push(spec.filename.clone());
139                    }
140                    (true, FileConflictPolicy::OverwriteKeepingHistory) => {
141                        let mut options = request.upload_options.clone();
142                        options.keep_old_version = true;
143                        self.upload_file(&request.identifier, spec, &options)
144                            .await?;
145                        uploaded_files.push(spec.filename.clone());
146                    }
147                    _ => {
148                        self.upload_file(&request.identifier, spec, &request.upload_options)
149                            .await?;
150                        uploaded_files.push(spec.filename.clone());
151                    }
152                }
153            }
154
155            let response = self
156                .update_item_metadata(&request.identifier, &request.metadata)
157                .await?;
158            metadata_changed = response.task_id.is_some();
159        } else {
160            metadata_changed = !request
161                .metadata
162                .as_header_encoding()
163                .remainder
164                .as_map()
165                .is_empty();
166            let first = &request.uploads[0];
167            self.create_item(
168                &request.identifier,
169                &request.metadata,
170                first,
171                &request.upload_options,
172            )
173            .await?;
174            uploaded_files.push(first.filename.clone());
175
176            for spec in request.uploads.iter().skip(1) {
177                self.upload_file(&request.identifier, spec, &request.upload_options)
178                    .await?;
179                uploaded_files.push(spec.filename.clone());
180            }
181
182            let current = self.wait_for_item(&request.identifier).await?;
183            if !metadata_contains_projection(&current.metadata, &request.metadata) {
184                let response = self
185                    .update_item_metadata(&request.identifier, &request.metadata)
186                    .await?;
187                metadata_changed = response.task_id.is_some();
188            }
189        }
190
191        let item = self
192            .wait_for_item_projection(&request.identifier, &uploaded_files, &request.metadata)
193            .await?;
194        Ok(PublishOutcome {
195            item,
196            created,
197            uploaded_files,
198            skipped_files,
199            metadata_changed,
200        })
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::PublishRequest;
207    use crate::client::InternetArchiveClient;
208    use crate::error::InternetArchiveError;
209    use crate::metadata::{ItemMetadata, MediaType};
210    use crate::upload::UploadSpec;
211    use crate::{IdentifierError, ItemIdentifier};
212
213    #[test]
214    fn publish_request_defaults_are_sensible() {
215        let request = PublishRequest::new(
216            ItemIdentifier::new("demo-item").unwrap(),
217            ItemMetadata::builder()
218                .mediatype(MediaType::Texts)
219                .title("Demo")
220                .build(),
221            vec![UploadSpec::from_bytes("demo.txt", b"hello")],
222        );
223
224        assert_eq!(request.uploads.len(), 1);
225        assert_eq!(request.identifier.as_str(), "demo-item");
226    }
227
228    #[tokio::test]
229    async fn create_or_update_item_rejects_empty_upload_lists_before_network_access() {
230        let client = InternetArchiveClient::new().unwrap();
231        let request = PublishRequest::new(
232            ItemIdentifier::new("demo-item").unwrap(),
233            ItemMetadata::builder().title("Demo").build(),
234            Vec::new(),
235        );
236
237        let error = client
238            .create_or_update_item(request, None, false)
239            .await
240            .unwrap_err();
241        assert!(
242            matches!(error, InternetArchiveError::InvalidState(message) if message.contains("at least one upload"))
243        );
244    }
245
246    #[tokio::test]
247    async fn publish_rejects_bucket_unsafe_identifiers_before_lookup() {
248        let client = InternetArchiveClient::new().unwrap();
249        let request = PublishRequest::new(
250            ItemIdentifier::new("Demo-item").unwrap(),
251            ItemMetadata::builder().title("Demo").build(),
252            vec![UploadSpec::from_bytes("demo.txt", b"hello")],
253        );
254
255        assert!(matches!(
256            client.publish_item(request).await.unwrap_err(),
257            InternetArchiveError::Identifier(IdentifierError::InvalidBucketCreationCharacter {
258                character: 'D',
259                ..
260            })
261        ));
262    }
263
264    #[tokio::test]
265    async fn create_or_update_item_rejects_existing_items_when_creation_is_forced() {
266        let client = InternetArchiveClient::new().unwrap();
267        let request = PublishRequest::new(
268            ItemIdentifier::new("demo-item").unwrap(),
269            ItemMetadata::builder().title("Demo").build(),
270            vec![UploadSpec::from_bytes("demo.txt", b"hello")],
271        );
272        let existing = serde_json::from_value(serde_json::json!({
273            "files": [],
274            "metadata": {"identifier": "demo-item"}
275        }))
276        .unwrap();
277
278        let error = client
279            .create_or_update_item(request, Some(existing), true)
280            .await
281            .unwrap_err();
282        assert!(
283            matches!(error, InternetArchiveError::InvalidState(message) if message.contains("already exists"))
284        );
285    }
286}