1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Content-addressable document ingest sub-client.
//!
//! See the companion spec in akribes's `docs/superpowers/specs/`.
use std::sync::Arc;
use sha2::{Digest, Sha256};
use crate::client::{AkribesClient, Inner};
use crate::error::{AkribesError, Result};
use crate::models::*;
/// Sub-client for `POST /projects/{pid}/documents{,/claim}`. Obtained via
/// [`crate::client::ProjectScope::documents`].
#[derive(Clone, Debug)]
pub struct DocumentsClient {
inner: Arc<Inner>,
project_id: i64,
}
impl DocumentsClient {
pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
Self { inner, project_id }
}
fn c(&self) -> AkribesClient {
AkribesClient {
inner: Arc::clone(&self.inner),
}
}
fn base_url(&self) -> String {
format!(
"{}/projects/{}/documents",
self.inner.base_url, self.project_id
)
}
/// Snapshot the server-side conversion progress for a content hash (#1151).
/// Returns `None` if no in-flight conversion is registered (terminal
/// already, or never uploaded). Cheap — a few-byte JSON response off an
/// in-memory map. Mirrors TS `documents.progress` and Python
/// `documents.progress`.
pub async fn progress(&self, content_hash: &str) -> Result<Option<IngestProgress>> {
let url = format!(
"{}/by-hash/{}/progress",
self.base_url(),
urlencoding::encode(content_hash),
);
let res = self.c().send(self.c().inner.http.get(&url)).await?;
let wire: ProgressResponseWire = crate::client::decode_json(res).await?;
Ok(match wire {
ProgressResponseWire::Converting {
done_pages,
total_pages,
} => Some(IngestProgress {
done: done_pages,
total: total_pages,
}),
ProgressResponseWire::Idle => None,
})
}
/// Check whether the server has these bytes cached (by content_hash).
/// On hit, the server creates-or-finds a per-project ref and the SDK
/// returns [`ClaimOutcome::Hit`] with the `doc_id` and current conversion
/// status. On miss, the caller must [`upload`](Self::upload) the bytes.
///
/// The `content_hash` returned in [`UploadResult`] comes from the server's
/// response, not the caller's argument — so a caller that passes a wrong
/// hash gets the server's authoritative value back.
pub async fn claim(&self, content_hash: &str, filename: &str) -> Result<ClaimOutcome> {
let url = format!("{}/claim", self.base_url());
let wire: ClaimResponseWire = self
.c()
.post(
&url,
&ClaimRequest {
content_hash,
filename,
},
)
.await?;
Ok(match wire {
ClaimResponseWire::Hit {
document_id,
filename,
content_hash,
conversion_status,
} => ClaimOutcome::Hit(UploadResult {
document_id,
filename,
content_hash,
conversion_status,
}),
ClaimResponseWire::Miss => ClaimOutcome::Miss,
})
}
/// Upload bytes. Server hashes, dedups against the `blobs` table, creates
/// a per-project ref, returns a `doc_<uuid>` scoped to this project.
pub async fn upload(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
let url = self.base_url();
let part = reqwest::multipart::Part::bytes(bytes)
.file_name(filename.to_string())
.mime_str("application/octet-stream")
.expect("valid MIME string");
let form = reqwest::multipart::Form::new().part("file", part);
self.c().post_multipart::<UploadResult>(&url, form).await
}
/// Convenience: hash locally, call [`claim`](Self::claim), fall back to
/// [`upload`](Self::upload) on miss. On hit where the blob is still
/// `Converting`, polls the claim endpoint until the status is terminal or
/// the configured ingest poll timeout elapses (default 300 s, see
/// [`crate::AkribesClientBuilder::ingest_poll_timeout`]). Returns
/// `AkribesError::Transient` on timeout so the caller can retry. If the
/// server reports `Failed` (after its own inline auto-reconvert has given
/// up), returns `AkribesError::Other`.
pub async fn ingest(&self, filename: &str, bytes: Vec<u8>) -> Result<UploadResult> {
let content_hash = hex::encode(Sha256::digest(&bytes));
let poll_timeout = self.inner.ingest_poll_timeout;
let result = match self.claim(&content_hash, filename).await? {
ClaimOutcome::Hit(mut r) => {
// If still converting, poll until terminal.
let deadline = std::time::Instant::now() + poll_timeout;
let mut backoff = std::time::Duration::from_millis(250);
while matches!(
r.conversion_status,
ConversionStatus::Converting | ConversionStatus::Pending
) {
if std::time::Instant::now() >= deadline {
return Err(AkribesError::Transient {
message: format!(
"document {} still converting after {}s",
r.document_id,
poll_timeout.as_secs(),
),
execution_id: None,
retry_after: None,
status: None,
});
}
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, std::time::Duration::from_secs(2));
match self.claim(&content_hash, filename).await? {
ClaimOutcome::Hit(new_r) => r = new_r,
ClaimOutcome::Miss => {
// Blob vanished mid-poll (likely GC or reconvert
// claimed it). Fall through to upload to repopulate.
return self.upload(filename, bytes).await;
}
}
}
r
}
ClaimOutcome::Miss => self.upload(filename, bytes).await?,
};
if result.conversion_status == ConversionStatus::Failed {
return Err(AkribesError::Other(format!(
"document {} conversion failed on the server — re-upload or call reconvert",
result.document_id
)));
}
Ok(result)
}
}