akribes_sdk/sub/
documents.rs1use std::sync::Arc;
6
7use sha2::{Digest, Sha256};
8
9use crate::client::{AkribesClient, Inner};
10use crate::error::{AkribesError, Result};
11use crate::models::*;
12
13#[derive(Clone, Debug)]
16pub struct DocumentsClient {
17 inner: Arc<Inner>,
18 project_id: i64,
19}
20
21impl DocumentsClient {
22 pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
23 Self { inner, project_id }
24 }
25
26 fn c(&self) -> AkribesClient {
27 AkribesClient {
28 inner: Arc::clone(&self.inner),
29 }
30 }
31
32 fn base_url(&self) -> String {
33 format!(
34 "{}/projects/{}/documents",
35 self.inner.base_url, self.project_id
36 )
37 }
38
39 pub async fn progress(&self, content_hash: &str) -> Result<Option<IngestProgress>> {
45 let url = format!(
46 "{}/by-hash/{}/progress",
47 self.base_url(),
48 urlencoding::encode(content_hash),
49 );
50 let res = self.c().send(self.c().inner.http.get(&url)).await?;
51 let wire: ProgressResponseWire = crate::client::decode_json(res).await?;
52 Ok(match wire {
53 ProgressResponseWire::Converting {
54 done_pages,
55 total_pages,
56 } => Some(IngestProgress {
57 done: done_pages,
58 total: total_pages,
59 }),
60 ProgressResponseWire::Idle => None,
61 })
62 }
63
64 pub async fn claim(&self, content_hash: &str, filename: &str) -> Result<ClaimOutcome> {
73 let url = format!("{}/claim", self.base_url());
74 let wire: ClaimResponseWire = self
75 .c()
76 .post(
77 &url,
78 &ClaimRequest {
79 content_hash,
80 filename,
81 },
82 )
83 .await?;
84 Ok(match wire {
85 ClaimResponseWire::Hit {
86 document_id,
87 filename,
88 content_hash,
89 conversion_status,
90 } => ClaimOutcome::Hit(UploadResult {
91 document_id,
92 filename,
93 content_hash,
94 conversion_status,
95 }),
96 ClaimResponseWire::Miss => ClaimOutcome::Miss,
97 })
98 }
99
100 pub async fn upload(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
103 let url = self.base_url();
104 let part = reqwest::multipart::Part::bytes(bytes)
105 .file_name(filename.to_string())
106 .mime_str("application/octet-stream")
107 .expect("valid MIME string");
108 let form = reqwest::multipart::Form::new().part("file", part);
109 self.c().post_multipart::<UploadResult>(&url, form).await
110 }
111
112 pub async fn ingest(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
121 let content_hash = hex::encode(Sha256::digest(&bytes));
122 let poll_timeout = self.inner.ingest_poll_timeout;
123
124 let result = match self.claim(&content_hash, filename).await? {
125 ClaimOutcome::Hit(mut r) => {
126 let deadline = std::time::Instant::now() + poll_timeout;
128 let mut backoff = std::time::Duration::from_millis(250);
129 while matches!(
130 r.conversion_status,
131 ConversionStatus::Converting | ConversionStatus::Pending
132 ) {
133 if std::time::Instant::now() >= deadline {
134 return Err(AkribesError::Transient {
135 message: format!(
136 "document {} still converting after {}s",
137 r.document_id,
138 poll_timeout.as_secs(),
139 ),
140 execution_id: None,
141 retry_after: None,
142 status: None,
143 });
144 }
145 tokio::time::sleep(backoff).await;
146 backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(2));
147 match self.claim(&content_hash, filename).await? {
148 ClaimOutcome::Hit(new_r) => r = new_r,
149 ClaimOutcome::Miss => {
150 return self.upload(filename, bytes).await;
153 }
154 }
155 }
156 r
157 }
158 ClaimOutcome::Miss => self.upload(filename, bytes).await?,
159 };
160
161 if result.conversion_status == ConversionStatus::Failed {
162 return Err(AkribesError::Other(format!(
163 "document {} conversion failed on the server — re-upload or call reconvert",
164 result.document_id
165 )));
166 }
167 Ok(result)
168 }
169}