Skip to main content

akribes_sdk/sub/
documents.rs

1//! Content-addressable document ingest sub-client.
2//!
3//! See the companion spec in akribes's `docs/superpowers/specs/`.
4
5use std::sync::Arc;
6
7use sha2::{Digest, Sha256};
8
9use crate::client::{AkribesClient, Inner};
10use crate::error::{AkribesError, Result};
11use crate::models::*;
12
13/// Sub-client for `POST /projects/{pid}/documents{,/claim}`. Obtained via
14/// [`crate::client::ProjectScope::documents`].
15#[derive(Clone, Debug)]
16pub struct DocumentsClient {
17    inner: Arc<Inner>,
18    project_id: i64,
19}
20
21impl DocumentsClient {
22    pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
23        Self { inner, project_id }
24    }
25
26    fn c(&self) -> AkribesClient {
27        AkribesClient {
28            inner: Arc::clone(&self.inner),
29        }
30    }
31
32    fn base_url(&self) -> String {
33        format!(
34            "{}/projects/{}/documents",
35            self.inner.base_url, self.project_id
36        )
37    }
38
39    /// Snapshot the server-side conversion progress for a content hash (#1151).
40    /// Returns `None` if no in-flight conversion is registered (terminal
41    /// already, or never uploaded). Cheap — a few-byte JSON response off an
42    /// in-memory map. Mirrors TS `documents.progress` and Python
43    /// `documents.progress`.
44    pub async fn progress(&self, content_hash: &str) -> Result<Option<IngestProgress>> {
45        let url = format!(
46            "{}/by-hash/{}/progress",
47            self.base_url(),
48            urlencoding::encode(content_hash),
49        );
50        let res = self.c().send(self.c().inner.http.get(&url)).await?;
51        let wire: ProgressResponseWire = crate::client::decode_json(res).await?;
52        Ok(match wire {
53            ProgressResponseWire::Converting {
54                done_pages,
55                total_pages,
56            } => Some(IngestProgress {
57                done: done_pages,
58                total: total_pages,
59            }),
60            ProgressResponseWire::Idle => None,
61        })
62    }
63
64    /// Check whether the server has these bytes cached (by content_hash).
65    /// On hit, the server creates-or-finds a per-project ref and the SDK
66    /// returns [`ClaimOutcome::Hit`] with the `doc_id` and current conversion
67    /// status. On miss, the caller must [`upload`](Self::upload) the bytes.
68    ///
69    /// The `content_hash` returned in [`UploadResult`] comes from the server's
70    /// response, not the caller's argument — so a caller that passes a wrong
71    /// hash gets the server's authoritative value back.
72    pub async fn claim(&self, content_hash: &str, filename: &str) -> Result<ClaimOutcome> {
73        let url = format!("{}/claim", self.base_url());
74        let wire: ClaimResponseWire = self
75            .c()
76            .post(
77                &url,
78                &ClaimRequest {
79                    content_hash,
80                    filename,
81                },
82            )
83            .await?;
84        Ok(match wire {
85            ClaimResponseWire::Hit {
86                document_id,
87                filename,
88                content_hash,
89                conversion_status,
90            } => ClaimOutcome::Hit(UploadResult {
91                document_id,
92                filename,
93                content_hash,
94                conversion_status,
95            }),
96            ClaimResponseWire::Miss => ClaimOutcome::Miss,
97        })
98    }
99
100    /// Upload bytes. Server hashes, dedups against the `blobs` table, creates
101    /// a per-project ref, returns a `doc_<uuid>` scoped to this project.
102    pub async fn upload(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
103        let url = self.base_url();
104        let part = reqwest::multipart::Part::bytes(bytes)
105            .file_name(filename.to_string())
106            .mime_str("application/octet-stream")
107            .expect("valid MIME string");
108        let form = reqwest::multipart::Form::new().part("file", part);
109        self.c().post_multipart::<UploadResult>(&url, form).await
110    }
111
112    /// Convenience: hash locally, call [`claim`](Self::claim), fall back to
113    /// [`upload`](Self::upload) on miss. On hit where the blob is still
114    /// `Converting`, polls the claim endpoint until the status is terminal or
115    /// the configured ingest poll timeout elapses (default 300 s, see
116    /// [`crate::AkribesClientBuilder::ingest_poll_timeout`]). Returns
117    /// `AkribesError::Transient` on timeout so the caller can retry. If the
118    /// server reports `Failed` (after its own inline auto-reconvert has given
119    /// up), returns `AkribesError::Other`.
120    pub async fn ingest(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
121        let content_hash = hex::encode(Sha256::digest(&bytes));
122        let poll_timeout = self.inner.ingest_poll_timeout;
123
124        let result = match self.claim(&content_hash, filename).await? {
125            ClaimOutcome::Hit(mut r) => {
126                // If still converting, poll until terminal.
127                let deadline = std::time::Instant::now() + poll_timeout;
128                let mut backoff = std::time::Duration::from_millis(250);
129                while matches!(
130                    r.conversion_status,
131                    ConversionStatus::Converting | ConversionStatus::Pending
132                ) {
133                    if std::time::Instant::now() >= deadline {
134                        return Err(AkribesError::Transient {
135                            message: format!(
136                                "document {} still converting after {}s",
137                                r.document_id,
138                                poll_timeout.as_secs(),
139                            ),
140                            execution_id: None,
141                            retry_after: None,
142                            status: None,
143                        });
144                    }
145                    tokio::time::sleep(backoff).await;
146                    backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(2));
147                    match self.claim(&content_hash, filename).await? {
148                        ClaimOutcome::Hit(new_r) => r = new_r,
149                        ClaimOutcome::Miss => {
150                            // Blob vanished mid-poll (likely GC or reconvert
151                            // claimed it). Fall through to upload to repopulate.
152                            return self.upload(filename, bytes).await;
153                        }
154                    }
155                }
156                r
157            }
158            ClaimOutcome::Miss => self.upload(filename, bytes).await?,
159        };
160
161        if result.conversion_status == ConversionStatus::Failed {
162            return Err(AkribesError::Other(format!(
163                "document {} conversion failed on the server — re-upload or call reconvert",
164                result.document_id
165            )));
166        }
167        Ok(result)
168    }
169}