1use mnm_core::provenance::Provenance;
4use mnm_core::types::{Document, DocumentKind};
5use serde::{Deserialize, Serialize};
6use sqlx::PgPool;
7use time::OffsetDateTime;
8use uuid::Uuid;
9
10use crate::error::Result;
11
12#[derive(Debug, Clone)]
15pub struct NewDocument<'a> {
16 pub source_version_id: Uuid,
18 pub node_id: Uuid,
20 pub kind: DocumentKind,
22 pub source_url: Option<&'a str>,
24 pub published_url: Option<&'a str>,
26 pub source_path: &'a str,
28 pub language: Option<&'a str>,
30 pub content_hash: &'a str,
32 pub source_modified_at: Option<OffsetDateTime>,
34 pub frontmatter: Option<serde_json::Value>,
36 pub provenance: &'a Provenance,
38 pub package_id: Option<Uuid>,
40 pub char_count: i32,
42 pub token_count: i32,
44}
45
46pub async fn insert(pool: &PgPool, doc: NewDocument<'_>) -> Result<Uuid> {
54 let kind_str = match doc.kind {
55 DocumentKind::Markdown => "markdown",
56 DocumentKind::Code => "code",
57 DocumentKind::Plaintext => "plaintext",
58 };
59 let provenance_json = serde_json::to_value(doc.provenance)
60 .map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
61 let row: (Uuid,) = sqlx::query_as(
62 "INSERT INTO document ( \
63 source_version_id, node_id, kind, source_url, published_url, source_path, language, \
64 content_hash, source_modified_at, frontmatter, provenance, package_id, char_count, token_count \
65 ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) RETURNING id",
66 )
67 .bind(doc.source_version_id)
68 .bind(doc.node_id)
69 .bind(kind_str)
70 .bind(doc.source_url)
71 .bind(doc.published_url)
72 .bind(doc.source_path)
73 .bind(doc.language)
74 .bind(doc.content_hash)
75 .bind(doc.source_modified_at)
76 .bind(doc.frontmatter)
77 .bind(provenance_json)
78 .bind(doc.package_id)
79 .bind(doc.char_count)
80 .bind(doc.token_count)
81 .fetch_one(pool)
82 .await?;
83 Ok(row.0)
84}
85
86pub async fn get_by_id(pool: &PgPool, id: Uuid) -> Result<Document> {
92 let row = sqlx::query_as::<_, DocumentRow>(
93 "SELECT id, source_version_id, node_id, kind, source_url, published_url, source_path, \
94 language, content_hash, source_modified_at, frontmatter, provenance, \
95 package_id, char_count, token_count, created_at \
96 FROM document WHERE id = $1",
97 )
98 .bind(id)
99 .fetch_one(pool)
100 .await?;
101 row.try_into()
102}
103
104pub async fn get_overview(pool: &PgPool, id: Uuid) -> Result<DocumentOverview> {
111 let document = get_by_id(pool, id).await?;
112 let (source_slug, source_display_name) = sqlx::query_as::<_, (String, String)>(
113 "SELECT s.slug, s.display_name FROM source s \
114 JOIN source_version sv ON sv.source_id = s.id \
115 WHERE sv.id = $1",
116 )
117 .bind(document.source_version_id)
118 .fetch_one(pool)
119 .await?;
120 let chunks = sqlx::query_as::<_, ChunkSkeleton>(
121 "SELECT id, chunk_index, token_count FROM chunk \
122 WHERE document_id = $1 AND status <> 'embed_failed' \
123 ORDER BY chunk_index ASC",
124 )
125 .bind(id)
126 .fetch_all(pool)
127 .await?;
128 Ok(DocumentOverview {
129 document,
130 source: crate::entities::chunk::SourceSummary {
131 slug: source_slug,
132 display_name: source_display_name,
133 },
134 chunks,
135 })
136}
137
138pub async fn list_chunks_window(
148 pool: &PgPool,
149 id: Uuid,
150 from: usize,
151 limit: usize,
152) -> Result<DocumentChunkWindow> {
153 let limit = limit.clamp(1, 100);
154 let document = get_by_id(pool, id).await?;
155 let (source_slug, source_display_name) = sqlx::query_as::<_, (String, String)>(
156 "SELECT s.slug, s.display_name FROM source s \
157 JOIN source_version sv ON sv.source_id = s.id \
158 WHERE sv.id = $1",
159 )
160 .bind(document.source_version_id)
161 .fetch_one(pool)
162 .await?;
163 let total: i64 = sqlx::query_scalar(
164 "SELECT COUNT(*) FROM chunk WHERE document_id = $1 AND status <> 'embed_failed'",
165 )
166 .bind(id)
167 .fetch_one(pool)
168 .await?;
169 let chunks = sqlx::query_as::<_, ChunkBodyRow>(
170 "SELECT id AS chunk_id, chunk_index, content, heading_path, token_count \
171 FROM chunk \
172 WHERE document_id = $1 AND status <> 'embed_failed' \
173 ORDER BY chunk_index ASC \
174 OFFSET $2 LIMIT $3",
175 )
176 .bind(id)
177 .bind(i64::try_from(from).unwrap_or(0))
178 .bind(i64::try_from(limit).unwrap_or(20))
179 .fetch_all(pool)
180 .await?
181 .into_iter()
182 .map(|r| ChunkBody {
183 chunk_id: r.chunk_id,
184 chunk_index: r.chunk_index,
185 content: r.content,
186 heading_path: r.heading_path,
187 token_count: r.token_count,
188 })
189 .collect();
190 Ok(DocumentChunkWindow {
191 document,
192 source: crate::entities::chunk::SourceSummary {
193 slug: source_slug,
194 display_name: source_display_name,
195 },
196 chunks,
197 from,
198 limit,
199 total_chunks: usize::try_from(total).unwrap_or(0),
200 })
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct DocumentSummary {
209 pub id: Uuid,
211 pub source_path: String,
213 pub content_hash: String,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222pub struct DocumentOverview {
223 #[serde(flatten)]
225 pub document: Document,
226 pub source: crate::entities::chunk::SourceSummary,
228 pub chunks: Vec<ChunkSkeleton>,
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::FromRow)]
234pub struct ChunkSkeleton {
235 pub id: Uuid,
237 pub chunk_index: i32,
239 pub token_count: i32,
241}
242
243#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
245pub struct ChunkBody {
246 pub chunk_id: Uuid,
248 pub chunk_index: i32,
250 pub content: String,
252 pub heading_path: Vec<String>,
254 pub token_count: i32,
256}
257
258#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
262pub struct DocumentChunkWindow {
263 #[serde(flatten)]
265 pub document: Document,
266 pub source: crate::entities::chunk::SourceSummary,
268 pub chunks: Vec<ChunkBody>,
270 pub from: usize,
272 pub limit: usize,
274 pub total_chunks: usize,
276}
277
278#[derive(Debug, Clone)]
280pub struct InventoryDoc {
281 pub source_path: String,
283 pub content_hash: String,
285 pub document_id: Uuid,
287 pub embed_complete: bool,
289}
290
291pub async fn list_active_inventory(
298 pool: &PgPool,
299 source_version_id: Uuid,
300) -> Result<Vec<InventoryDoc>> {
301 let rows: Vec<(String, String, Uuid, bool)> = sqlx::query_as(
302 "SELECT d.source_path, d.content_hash, d.id, \
303 (COUNT(c.id) > 0 AND COUNT(c.id) FILTER (WHERE c.status = 'embed_failed') = 0) \
304 FROM document d \
305 LEFT JOIN chunk c ON c.document_id = d.id \
306 WHERE d.source_version_id = $1 \
307 GROUP BY d.id, d.source_path, d.content_hash \
308 ORDER BY d.source_path",
309 )
310 .bind(source_version_id)
311 .fetch_all(pool)
312 .await?;
313 Ok(rows
314 .into_iter()
315 .map(|(source_path, content_hash, document_id, embed_complete)| InventoryDoc {
316 source_path,
317 content_hash,
318 document_id,
319 embed_complete,
320 })
321 .collect())
322}
323
324pub async fn list_for_source_version(
331 pool: &PgPool,
332 source_version_id: Uuid,
333) -> Result<Vec<DocumentSummary>> {
334 let rows: Vec<(Uuid, String, String)> = sqlx::query_as(
335 "SELECT id, source_path, content_hash FROM document WHERE source_version_id = $1 \
336 ORDER BY source_path",
337 )
338 .bind(source_version_id)
339 .fetch_all(pool)
340 .await?;
341 Ok(rows
342 .into_iter()
343 .map(|(id, source_path, content_hash)| DocumentSummary { id, source_path, content_hash })
344 .collect())
345}
346
347pub async fn find_by_hash(
357 pool: &PgPool,
358 source_version_id: Uuid,
359 content_hash: &str,
360) -> Result<Option<Uuid>> {
361 let row: Option<(Uuid,)> = sqlx::query_as(
362 "SELECT id FROM document WHERE source_version_id = $1 AND content_hash = $2",
363 )
364 .bind(source_version_id)
365 .bind(content_hash)
366 .fetch_optional(pool)
367 .await?;
368 Ok(row.map(|r| r.0))
369}
370
371#[derive(sqlx::FromRow)]
372struct DocumentRow {
373 id: Uuid,
374 source_version_id: Uuid,
375 node_id: Uuid,
376 kind: String,
377 source_url: Option<String>,
378 published_url: Option<String>,
379 source_path: String,
380 language: Option<String>,
381 content_hash: String,
382 source_modified_at: Option<OffsetDateTime>,
383 frontmatter: Option<serde_json::Value>,
384 provenance: serde_json::Value,
385 package_id: Option<Uuid>,
386 char_count: i32,
387 token_count: i32,
388 created_at: OffsetDateTime,
389}
390
391#[derive(sqlx::FromRow)]
392struct ChunkBodyRow {
393 chunk_id: Uuid,
394 chunk_index: i32,
395 content: String,
396 heading_path: Vec<String>,
397 token_count: i32,
398}
399
400impl TryFrom<DocumentRow> for Document {
401 type Error = crate::error::StoreError;
402
403 fn try_from(r: DocumentRow) -> std::result::Result<Self, Self::Error> {
404 let kind: DocumentKind = serde_json::from_value(serde_json::Value::String(r.kind))
405 .map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
406 let provenance: Provenance = serde_json::from_value(r.provenance)
407 .map_err(|e| crate::error::StoreError::Json(e.to_string()))?;
408 Ok(Self {
409 id: r.id,
410 source_version_id: r.source_version_id,
411 node_id: r.node_id,
412 kind,
413 source_url: r.source_url,
414 published_url: r.published_url,
415 source_path: r.source_path,
416 language: r.language,
417 content_hash: r.content_hash,
418 source_modified_at: r.source_modified_at,
419 frontmatter: r.frontmatter,
420 provenance,
421 package_id: r.package_id,
422 char_count: r.char_count,
423 token_count: r.token_count,
424 created_at: r.created_at,
425 })
426 }
427}