use mnm_core::provenance::Provenance;
use mnm_core::types::{Document, DocumentKind};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::error::Result;
#[derive(Debug, Clone)]
pub struct NewDocument<'a> {
pub source_version_id: Uuid,
pub node_id: Uuid,
pub kind: DocumentKind,
pub source_url: Option<&'a str>,
pub published_url: Option<&'a str>,
pub source_path: &'a str,
pub language: Option<&'a str>,
pub content_hash: &'a str,
pub source_modified_at: Option<OffsetDateTime>,
pub frontmatter: Option<serde_json::Value>,
pub provenance: &'a Provenance,
pub package_id: Option<Uuid>,
pub char_count: i32,
pub token_count: i32,
}
pub async fn insert(pool: &PgPool, doc: NewDocument<'_>) -> Result<Uuid> {
let kind_str = match doc.kind {
DocumentKind::Markdown => "markdown",
DocumentKind::Code => "code",
DocumentKind::Plaintext => "plaintext",
};
let provenance_json = serde_json::to_value(doc.provenance)
.map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
let row: (Uuid,) = sqlx::query_as(
"INSERT INTO document ( \
source_version_id, node_id, kind, source_url, published_url, source_path, language, \
content_hash, source_modified_at, frontmatter, provenance, package_id, char_count, token_count \
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) RETURNING id",
)
.bind(doc.source_version_id)
.bind(doc.node_id)
.bind(kind_str)
.bind(doc.source_url)
.bind(doc.published_url)
.bind(doc.source_path)
.bind(doc.language)
.bind(doc.content_hash)
.bind(doc.source_modified_at)
.bind(doc.frontmatter)
.bind(provenance_json)
.bind(doc.package_id)
.bind(doc.char_count)
.bind(doc.token_count)
.fetch_one(pool)
.await?;
Ok(row.0)
}
pub async fn get_by_id(pool: &PgPool, id: Uuid) -> Result<Document> {
let row = sqlx::query_as::<_, DocumentRow>(
"SELECT id, source_version_id, node_id, kind, source_url, published_url, source_path, \
language, content_hash, source_modified_at, frontmatter, provenance, \
package_id, char_count, token_count, created_at \
FROM document WHERE id = $1",
)
.bind(id)
.fetch_one(pool)
.await?;
row.try_into()
}
pub async fn get_overview(pool: &PgPool, id: Uuid) -> Result<DocumentOverview> {
let document = get_by_id(pool, id).await?;
let (source_slug, source_display_name) = sqlx::query_as::<_, (String, String)>(
"SELECT s.slug, s.display_name FROM source s \
JOIN source_version sv ON sv.source_id = s.id \
WHERE sv.id = $1",
)
.bind(document.source_version_id)
.fetch_one(pool)
.await?;
let chunks = sqlx::query_as::<_, ChunkSkeleton>(
"SELECT id, chunk_index, token_count FROM chunk \
WHERE document_id = $1 AND status <> 'embed_failed' \
ORDER BY chunk_index ASC",
)
.bind(id)
.fetch_all(pool)
.await?;
Ok(DocumentOverview {
document,
source: crate::entities::chunk::SourceSummary {
slug: source_slug,
display_name: source_display_name,
},
chunks,
})
}
pub async fn list_chunks_window(
pool: &PgPool,
id: Uuid,
from: usize,
limit: usize,
) -> Result<DocumentChunkWindow> {
let limit = limit.clamp(1, 100);
let document = get_by_id(pool, id).await?;
let (source_slug, source_display_name) = sqlx::query_as::<_, (String, String)>(
"SELECT s.slug, s.display_name FROM source s \
JOIN source_version sv ON sv.source_id = s.id \
WHERE sv.id = $1",
)
.bind(document.source_version_id)
.fetch_one(pool)
.await?;
let total: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM chunk WHERE document_id = $1 AND status <> 'embed_failed'",
)
.bind(id)
.fetch_one(pool)
.await?;
let chunks = sqlx::query_as::<_, ChunkBodyRow>(
"SELECT id AS chunk_id, chunk_index, content, heading_path, token_count \
FROM chunk \
WHERE document_id = $1 AND status <> 'embed_failed' \
ORDER BY chunk_index ASC \
OFFSET $2 LIMIT $3",
)
.bind(id)
.bind(i64::try_from(from).unwrap_or(0))
.bind(i64::try_from(limit).unwrap_or(20))
.fetch_all(pool)
.await?
.into_iter()
.map(|r| ChunkBody {
chunk_id: r.chunk_id,
chunk_index: r.chunk_index,
content: r.content,
heading_path: r.heading_path,
token_count: r.token_count,
})
.collect();
Ok(DocumentChunkWindow {
document,
source: crate::entities::chunk::SourceSummary {
slug: source_slug,
display_name: source_display_name,
},
chunks,
from,
limit,
total_chunks: usize::try_from(total).unwrap_or(0),
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocumentSummary {
pub id: Uuid,
pub source_path: String,
pub content_hash: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DocumentOverview {
#[serde(flatten)]
pub document: Document,
pub source: crate::entities::chunk::SourceSummary,
pub chunks: Vec<ChunkSkeleton>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::FromRow)]
pub struct ChunkSkeleton {
pub id: Uuid,
pub chunk_index: i32,
pub token_count: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChunkBody {
pub chunk_id: Uuid,
pub chunk_index: i32,
pub content: String,
pub heading_path: Vec<String>,
pub token_count: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DocumentChunkWindow {
#[serde(flatten)]
pub document: Document,
pub source: crate::entities::chunk::SourceSummary,
pub chunks: Vec<ChunkBody>,
pub from: usize,
pub limit: usize,
pub total_chunks: usize,
}
#[derive(Debug, Clone)]
pub struct InventoryDoc {
pub source_path: String,
pub content_hash: String,
pub document_id: Uuid,
pub embed_complete: bool,
}
pub async fn list_active_inventory(
pool: &PgPool,
source_version_id: Uuid,
) -> Result<Vec<InventoryDoc>> {
let rows: Vec<(String, String, Uuid, bool)> = sqlx::query_as(
"SELECT d.source_path, d.content_hash, d.id, \
(COUNT(c.id) > 0 AND COUNT(c.id) FILTER (WHERE c.status = 'embed_failed') = 0) \
FROM document d \
LEFT JOIN chunk c ON c.document_id = d.id \
WHERE d.source_version_id = $1 \
GROUP BY d.id, d.source_path, d.content_hash \
ORDER BY d.source_path",
)
.bind(source_version_id)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|(source_path, content_hash, document_id, embed_complete)| InventoryDoc {
source_path,
content_hash,
document_id,
embed_complete,
})
.collect())
}
pub async fn list_for_source_version(
pool: &PgPool,
source_version_id: Uuid,
) -> Result<Vec<DocumentSummary>> {
let rows: Vec<(Uuid, String, String)> = sqlx::query_as(
"SELECT id, source_path, content_hash FROM document WHERE source_version_id = $1 \
ORDER BY source_path",
)
.bind(source_version_id)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|(id, source_path, content_hash)| DocumentSummary { id, source_path, content_hash })
.collect())
}
pub async fn find_by_hash(
pool: &PgPool,
source_version_id: Uuid,
content_hash: &str,
) -> Result<Option<Uuid>> {
let row: Option<(Uuid,)> = sqlx::query_as(
"SELECT id FROM document WHERE source_version_id = $1 AND content_hash = $2",
)
.bind(source_version_id)
.bind(content_hash)
.fetch_optional(pool)
.await?;
Ok(row.map(|r| r.0))
}
#[derive(sqlx::FromRow)]
struct DocumentRow {
id: Uuid,
source_version_id: Uuid,
node_id: Uuid,
kind: String,
source_url: Option<String>,
published_url: Option<String>,
source_path: String,
language: Option<String>,
content_hash: String,
source_modified_at: Option<OffsetDateTime>,
frontmatter: Option<serde_json::Value>,
provenance: serde_json::Value,
package_id: Option<Uuid>,
char_count: i32,
token_count: i32,
created_at: OffsetDateTime,
}
#[derive(sqlx::FromRow)]
struct ChunkBodyRow {
chunk_id: Uuid,
chunk_index: i32,
content: String,
heading_path: Vec<String>,
token_count: i32,
}
impl TryFrom<DocumentRow> for Document {
type Error = crate::error::StoreError;
fn try_from(r: DocumentRow) -> std::result::Result<Self, Self::Error> {
let kind: DocumentKind = serde_json::from_value(serde_json::Value::String(r.kind))
.map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
let provenance: Provenance = serde_json::from_value(r.provenance)
.map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
Ok(Self {
id: r.id,
source_version_id: r.source_version_id,
node_id: r.node_id,
kind,
source_url: r.source_url,
published_url: r.published_url,
source_path: r.source_path,
language: r.language,
content_hash: r.content_hash,
source_modified_at: r.source_modified_at,
frontmatter: r.frontmatter,
provenance,
package_id: r.package_id,
char_count: r.char_count,
token_count: r.token_count,
created_at: r.created_at,
})
}
}