use reqwest::multipart::Form;
use crate::client::BigRag;
use crate::core::urlencode;
use crate::error::BigRagError;
use crate::files::FileInput;
use crate::sse::SseStream;
use crate::types::common::StatusResponse;
use crate::types::documents::{
BatchDeleteDocumentsResponse, BatchGetDocumentsResponse, BatchStatusResponse, Document,
DocumentChunkListResponse, DocumentListOptions, DocumentListResponse, S3IngestBody,
S3IngestResponse, S3JobListResponse,
};
pub struct Documents<'a> {
pub(crate) client: &'a BigRag,
}
impl Documents<'_> {
pub async fn upload(
&self,
collection: &str,
file: impl Into<FileInput>,
metadata: Option<serde_json::Value>,
) -> Result<Document, BigRagError> {
let file_input: FileInput = file.into();
let part = file_input.into_multipart_part().await?;
let mut form = Form::new().part("file", part);
if let Some(meta) = metadata {
form = form.text("metadata", serde_json::to_string(&meta)?);
}
let path = format!("/v1/collections/{}/documents", urlencode(collection));
self.client.transport.post_multipart(&path, form).await
}
pub async fn batch_upload(
&self,
collection: &str,
files: Vec<FileInput>,
metadata: Option<serde_json::Value>,
) -> Result<DocumentListResponse, BigRagError> {
let mut form = Form::new();
for file_input in files {
let part = file_input.into_multipart_part().await?;
form = form.part("files", part);
}
if let Some(meta) = metadata {
form = form.text("metadata", serde_json::to_string(&meta)?);
}
let path = format!(
"/v1/collections/{}/documents/batch/upload",
urlencode(collection)
);
self.client.transport.post_multipart(&path, form).await
}
pub async fn list(
&self,
collection: &str,
options: Option<DocumentListOptions>,
) -> Result<DocumentListResponse, BigRagError> {
let mut query = Vec::new();
if let Some(opts) = options {
if let Some(status) = opts.status {
query.push(("status".into(), status));
}
if let Some(limit) = opts.limit {
query.push(("limit".into(), limit.to_string()));
}
if let Some(offset) = opts.offset {
query.push(("offset".into(), offset.to_string()));
}
}
let path = format!("/v1/collections/{}/documents", urlencode(collection));
self.client.transport.get(&path, query).await
}
pub async fn get(
&self,
collection: &str,
document_id: &str,
) -> Result<Document, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/{}",
urlencode(collection),
urlencode(document_id)
);
self.client.transport.get(&path, vec![]).await
}
pub async fn delete(
&self,
collection: &str,
document_id: &str,
) -> Result<StatusResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/{}",
urlencode(collection),
urlencode(document_id)
);
self.client.transport.delete(&path).await
}
pub async fn reprocess(
&self,
collection: &str,
document_id: &str,
) -> Result<StatusResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/{}/reprocess",
urlencode(collection),
urlencode(document_id)
);
self.client
.transport
.post(&path, &serde_json::Value::Null)
.await
}
pub async fn get_chunks(
&self,
collection: &str,
document_id: &str,
) -> Result<DocumentChunkListResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/{}/chunks",
urlencode(collection),
urlencode(document_id)
);
self.client.transport.get(&path, vec![]).await
}
pub fn get_file_url(&self, collection: &str, document_id: &str) -> String {
let base = format!(
"{}/v1/collections/{}/documents/{}/file",
self.client.config.base_url,
urlencode(collection),
urlencode(document_id)
);
match &self.client.config.api_key {
Some(key) => format!("{}?token={}", base, urlencode(key)),
None => base,
}
}
pub async fn batch_get_status(
&self,
collection: &str,
document_ids: &[&str],
) -> Result<BatchStatusResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/batch/status",
urlencode(collection)
);
let body = serde_json::json!({
"document_ids": document_ids,
});
self.client.transport.post(&path, &body).await
}
pub async fn batch_get(
&self,
collection: &str,
document_ids: &[&str],
) -> Result<BatchGetDocumentsResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/batch/get",
urlencode(collection)
);
let body = serde_json::json!({
"document_ids": document_ids,
});
self.client.transport.post(&path, &body).await
}
pub async fn batch_delete(
&self,
collection: &str,
document_ids: &[&str],
) -> Result<BatchDeleteDocumentsResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/batch/delete",
urlencode(collection)
);
let body = serde_json::json!({
"document_ids": document_ids,
});
self.client.transport.post(&path, &body).await
}
pub async fn ingest_s3(
&self,
collection: &str,
body: S3IngestBody,
) -> Result<S3IngestResponse, BigRagError> {
let path = format!("/v1/collections/{}/documents/s3", urlencode(collection));
self.client.transport.post(&path, &body).await
}
pub async fn list_s3_jobs(
&self,
collection: &str,
) -> Result<S3JobListResponse, BigRagError> {
let path = format!("/v1/collections/{}/s3-jobs", urlencode(collection));
self.client.transport.get(&path, vec![]).await
}
pub async fn delete_s3_job(
&self,
collection: &str,
job_id: &str,
) -> Result<StatusResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/s3-jobs/{}",
urlencode(collection),
urlencode(job_id)
);
self.client.transport.delete(&path).await
}
pub async fn resync_s3_job(
&self,
collection: &str,
job_id: &str,
) -> Result<StatusResponse, BigRagError> {
let path = format!(
"/v1/collections/{}/s3-jobs/{}/resync",
urlencode(collection),
urlencode(job_id)
);
self.client
.transport
.post(&path, &serde_json::Value::Null)
.await
}
pub async fn get_by_id(&self, document_id: &str) -> Result<Document, BigRagError> {
let path = format!("/v1/documents/{}", urlencode(document_id));
self.client.transport.get(&path, vec![]).await
}
pub async fn get_chunks_by_id(
&self,
document_id: &str,
) -> Result<DocumentChunkListResponse, BigRagError> {
let path = format!("/v1/documents/{}/chunks", urlencode(document_id));
self.client.transport.get(&path, vec![]).await
}
pub async fn stream_progress(
&self,
collection: &str,
document_id: &str,
) -> Result<SseStream, BigRagError> {
let path = format!(
"/v1/collections/{}/documents/{}/progress",
urlencode(collection),
urlencode(document_id)
);
let response = self.client.transport.get_stream(&path).await?;
Ok(SseStream::new(response))
}
pub async fn stream_batch_progress(
&self,
collection: &str,
document_ids: &[&str],
) -> Result<SseStream, BigRagError> {
let ids = document_ids.iter().map(|id| urlencode(id)).collect::<Vec<_>>().join(",");
let path = format!(
"/v1/collections/{}/documents/batch/progress?ids={}",
urlencode(collection),
ids
);
let response = self.client.transport.get_stream(&path).await?;
Ok(SseStream::new(response))
}
}