Skip to main content

typub_storage/status/
tracker.rs

1//! Status tracker for managing publish status.
2//!
3//! SQLite-backed status tracking for content publishing.
4
5use super::types::{AssetUploadRecord, PlatformStatus, PublishResult};
6use anyhow::{Context, Result};
7use chrono::Utc;
8use rusqlite::{Connection, OptionalExtension, params};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use typub_core::Content;
12
13/// Status tracker for managing publish status
14pub struct StatusTracker {
15    conn: Connection,
16    path: PathBuf,
17    /// Project root for path normalization per [[RFC-0005:C-PROJECT-ROOT]]
18    project_root: PathBuf,
19}
20
21impl StatusTracker {
22    /// Load status with default configuration.
23    /// Uses current directory as project root.
24    pub fn load_default() -> Result<Self> {
25        Self::load(Path::new("."))
26    }
27
28    /// Load status from disk
29    ///
30    /// # Arguments
31    /// * `project_root` - The project root directory for path normalization
32    pub fn load(project_root: &Path) -> Result<Self> {
33        let status_dir = PathBuf::from(".typub");
34        std::fs::create_dir_all(&status_dir)?;
35        Self::open_at(status_dir.join("status.db"), project_root)
36    }
37
38    fn open_at(path: PathBuf, project_root: &Path) -> Result<Self> {
39        let conn = Connection::open(&path)
40            .with_context(|| format!("Failed to open status DB: {}", path.display()))?;
41        conn.execute_batch(
42            r#"
43CREATE TABLE IF NOT EXISTS posts (
44    slug TEXT PRIMARY KEY
45);
46
47CREATE TABLE IF NOT EXISTS platform_status (
48    slug TEXT NOT NULL,
49    platform TEXT NOT NULL,
50    published INTEGER NOT NULL,
51    url TEXT,
52    platform_id TEXT,
53    published_at TEXT,
54    content_hash TEXT,
55    PRIMARY KEY (slug, platform)
56);
57
58CREATE INDEX IF NOT EXISTS idx_platform_status_slug ON platform_status(slug);
59CREATE INDEX IF NOT EXISTS idx_platform_status_platform ON platform_status(platform);
60
61CREATE TABLE IF NOT EXISTS publish_reconcile (
62    id INTEGER PRIMARY KEY AUTOINCREMENT,
63    slug TEXT NOT NULL,
64    platform TEXT NOT NULL,
65    remote_url TEXT,
66    remote_id TEXT,
67    error TEXT NOT NULL,
68    created_at TEXT NOT NULL
69);
70
71CREATE INDEX IF NOT EXISTS idx_publish_reconcile_slug_platform ON publish_reconcile(slug, platform);
72
73-- Asset upload tracking per [[RFC-0004:C-UPLOAD-TRACKING]]
74-- Two-index model: content_index and path_index
75
76CREATE TABLE IF NOT EXISTS asset_uploads (
77    id INTEGER PRIMARY KEY AUTOINCREMENT,
78    local_path TEXT NOT NULL,
79    storage_config_id TEXT NOT NULL,
80    content_hash TEXT NOT NULL,
81    extension TEXT NOT NULL,
82    remote_key TEXT NOT NULL,
83    remote_url TEXT NOT NULL,
84    uploaded_at TEXT NOT NULL,
85    UNIQUE(storage_config_id, content_hash, extension)
86);
87
88CREATE INDEX IF NOT EXISTS idx_asset_uploads_content
89    ON asset_uploads(storage_config_id, content_hash, extension);
90CREATE INDEX IF NOT EXISTS idx_asset_uploads_path
91    ON asset_uploads(local_path, storage_config_id);
92"#,
93        )
94        .context("Failed to initialize status DB schema")?;
95
96        // Migration: add remote_status column per [[RFC-0005:C-STATUS-TRACKING]]
97        // This is safe to run multiple times (SQLite ignores if column exists)
98        let _ = conn.execute(
99            "ALTER TABLE platform_status ADD COLUMN remote_status TEXT",
100            [],
101        );
102
103        let project_root = project_root
104            .canonicalize()
105            .unwrap_or_else(|_| project_root.to_path_buf());
106        Ok(Self {
107            conn,
108            path,
109            project_root,
110        })
111    }
112
113    /// Open a status tracker for testing purposes.
114    #[cfg(test)]
115    pub fn open_for_test(path: PathBuf, project_root: &Path) -> Result<Self> {
116        Self::open_at(path, project_root)
117    }
118
119    /// Get the project root for path normalization.
120    pub fn project_root(&self) -> &Path {
121        &self.project_root
122    }
123
124    /// Normalize a local path to relative format for storage.
125    ///
126    /// Per [[RFC-0005:C-PROJECT-ROOT]], paths stored in the database MUST be:
127    /// - Relative to project root
128    /// - Using forward slashes (OS-agnostic)
129    /// - Without `./` prefix or `..` components
130    pub fn normalize_path(&self, local_path: &Path) -> Result<String> {
131        normalize_to_relative(local_path, &self.project_root)
132    }
133
134    /// Save status to disk
135    pub fn save(&self) -> Result<()> {
136        // SQLite writes happen transactionally in mutating methods.
137        // Keep this method for API compatibility.
138        let _ = &self.path;
139        Ok(())
140    }
141
142    // =========================================================================
143    // Platform Status Methods
144    // =========================================================================
145
146    fn load_platform_status(&self, slug: &str, platform: &str) -> Result<Option<PlatformStatus>> {
147        self.conn
148            .query_row(
149                r#"
150SELECT published, url, platform_id, published_at, content_hash, remote_status
151FROM platform_status
152WHERE slug = ?1 AND platform = ?2
153"#,
154                params![slug, platform],
155                |row| {
156                    let published: i64 = row.get(0)?;
157                    let url: Option<String> = row.get(1)?;
158                    let platform_id: Option<String> = row.get(2)?;
159                    let published_at_str: Option<String> = row.get(3)?;
160                    let content_hash: Option<String> = row.get(4)?;
161                    let remote_status: Option<String> = row.get(5)?;
162
163                    let last_publish = match published_at_str {
164                        Some(ts) => {
165                            let parsed = chrono::DateTime::parse_from_rfc3339(&ts)
166                                .map_err(|e| {
167                                    rusqlite::Error::FromSqlConversionFailure(
168                                        3,
169                                        rusqlite::types::Type::Text,
170                                        Box::new(e),
171                                    )
172                                })?
173                                .with_timezone(&Utc);
174                            Some(PublishResult {
175                                url,
176                                platform_id,
177                                published_at: parsed,
178                            })
179                        }
180                        None => None,
181                    };
182
183                    Ok(PlatformStatus {
184                        published: published != 0,
185                        last_publish,
186                        content_hash,
187                        remote_status,
188                    })
189                },
190            )
191            .optional()
192            .map_err(Into::into)
193    }
194
195    /// Get previously published URL for a post+platform.
196    pub fn get_published_url(&self, slug: &str, platform: &str) -> Result<Option<String>> {
197        self.load_platform_status(slug, platform).map(|o| {
198            o.and_then(|s| {
199                if s.published {
200                    s.last_publish.and_then(|p| p.url)
201                } else {
202                    None
203                }
204            })
205        })
206    }
207
208    /// Get the first published URL for a post across all platforms.
209    ///
210    /// Returns the platform with the lowest alphabetical ID that has a published URL.
211    /// Used by copypaste adapters for auto-selecting internal link targets.
212    pub fn get_first_published_url(&self, slug: &str) -> Result<Option<(String, String)>> {
213        self.conn
214            .query_row(
215                r#"
216SELECT platform, url
217FROM platform_status
218WHERE slug = ?1 AND published = 1 AND url IS NOT NULL
219ORDER BY platform ASC
220LIMIT 1
221"#,
222                params![slug],
223                |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
224            )
225            .optional()
226            .map_err(Into::into)
227    }
228
229    /// Get previously stored platform-specific ID for a post+platform.
230    pub fn get_platform_id(&self, slug: &str, platform: &str) -> Result<Option<String>> {
231        self.conn
232            .query_row(
233                r#"
234SELECT platform_id
235FROM platform_status
236WHERE slug = ?1 AND platform = ?2 AND published = 1
237"#,
238                params![slug, platform],
239                |row| row.get::<_, Option<String>>(0),
240            )
241            .optional()
242            .map(|o| o.flatten())
243            .map_err(Into::into)
244    }
245
246    /// Get platform status for lifecycle determination.
247    /// Per [[RFC-0005:C-LIFECYCLE-TRANSITIONS]], used by pipeline to determine remote_status.
248    pub fn load_platform_status_internal(
249        &self,
250        slug: &str,
251        platform: &str,
252    ) -> Result<Option<PlatformStatus>> {
253        self.load_platform_status(slug, platform)
254    }
255
256    /// Get status for a post across all platforms.
257    /// Returns (published, last_url) for each platform.
258    ///
259    /// This queries the database directly for all platforms that have status
260    /// records for this slug, rather than being limited to platforms defined
261    /// in the content's meta.toml.
262    pub fn get_status(&self, content: &Content) -> HashMap<String, (bool, Option<String>)> {
263        let slug = content.slug();
264        let mut result = HashMap::new();
265
266        // Query all platforms from the database for this slug
267        let db_platforms = self.load_all_platform_statuses(slug).unwrap_or_default();
268        for (platform, status) in db_platforms {
269            let published = status.published;
270            let url = status.last_publish.and_then(|p| p.url);
271            result.insert(platform, (published, url));
272        }
273
274        // Also include platforms from meta.toml that may not have status yet
275        for platform in content.meta.platforms.keys() {
276            result.entry(platform.clone()).or_insert((false, None));
277        }
278
279        result
280    }
281
282    /// Load all platform statuses for a slug from the database.
283    fn load_all_platform_statuses(&self, slug: &str) -> Result<Vec<(String, PlatformStatus)>> {
284        let mut stmt = self.conn.prepare(
285            r#"
286SELECT platform, published, url, platform_id, published_at, content_hash, remote_status
287FROM platform_status
288WHERE slug = ?1
289"#,
290        )?;
291
292        let rows = stmt.query_map(params![slug], |row| {
293            let platform: String = row.get(0)?;
294            let published: i64 = row.get(1)?;
295            let url: Option<String> = row.get(2)?;
296            let platform_id: Option<String> = row.get(3)?;
297            let published_at_str: Option<String> = row.get(4)?;
298            let content_hash: Option<String> = row.get(5)?;
299            let remote_status: Option<String> = row.get(6)?;
300
301            let last_publish = match published_at_str {
302                Some(ts) => {
303                    let parsed = chrono::DateTime::parse_from_rfc3339(&ts).map_err(|e| {
304                        rusqlite::Error::FromSqlConversionFailure(
305                            3,
306                            rusqlite::types::Type::Text,
307                            Box::new(e),
308                        )
309                    })?;
310                    Some(PublishResult {
311                        url,
312                        platform_id,
313                        published_at: parsed.with_timezone(&chrono::Utc),
314                    })
315                }
316                None => None,
317            };
318
319            Ok((
320                platform,
321                PlatformStatus {
322                    published: published != 0,
323                    last_publish,
324                    content_hash,
325                    remote_status,
326                },
327            ))
328        })?;
329
330        rows.collect::<std::result::Result<Vec<_>, _>>()
331            .context("Failed to load platform statuses")
332    }
333
334    /// Mark a post as published to a platform.
335    ///
336    /// Per [[RFC-0005:C-STATUS-TRACKING]], stores the remote lifecycle state.
337    ///
338    /// # Arguments
339    /// * `remote_status` - "draft" or "published" for API-based platforms,
340    ///   or `None` for local output platforms.
341    pub fn mark_published(
342        &mut self,
343        content: &Content,
344        platform: &str,
345        result: &PublishResult,
346        remote_status: Option<&str>,
347    ) -> Result<()> {
348        let slug = content.slug().to_string();
349        let content_hash = self.compute_hash(content)?;
350        let tx = self
351            .conn
352            .transaction()
353            .context("Failed to start status transaction")?;
354        tx.execute(
355            "INSERT OR IGNORE INTO posts (slug) VALUES (?1)",
356            params![slug.as_str()],
357        )?;
358        tx.execute(
359            r#"
360INSERT INTO platform_status (slug, platform, published, url, platform_id, published_at, content_hash, remote_status)
361VALUES (?1, ?2, 1, ?3, ?4, ?5, ?6, ?7)
362ON CONFLICT(slug, platform) DO UPDATE SET
363    published = 1,
364    url = excluded.url,
365    platform_id = excluded.platform_id,
366    published_at = excluded.published_at,
367    content_hash = excluded.content_hash,
368    remote_status = excluded.remote_status
369"#,
370            params![
371                slug.as_str(),
372                platform,
373                result.url.as_deref(),
374                result.platform_id.as_deref(),
375                result.published_at.to_rfc3339(),
376                content_hash,
377                remote_status
378            ],
379        )?;
380        tx.commit().context("Failed to commit status transaction")?;
381
382        Ok(())
383    }
384
385    /// Record a reconciliation signal when remote publish succeeded
386    /// but local status persistence failed.
387    pub fn record_reconcile(
388        &self,
389        slug: &str,
390        platform: &str,
391        remote_id: Option<&str>,
392        remote_url: Option<&str>,
393        error: &str,
394    ) -> Result<()> {
395        self.conn.execute(
396            r#"
397INSERT INTO publish_reconcile (slug, platform, remote_url, remote_id, error, created_at)
398VALUES (?1, ?2, ?3, ?4, ?5, ?6)
399"#,
400            params![
401                slug,
402                platform,
403                remote_url,
404                remote_id,
405                error,
406                Utc::now().to_rfc3339()
407            ],
408        )?;
409        Ok(())
410    }
411
412    /// Check if content has changed since last publish
413    pub fn has_changed(&self, content: &Content, platform: &str) -> Result<bool> {
414        let slug = content.slug();
415
416        let current_hash = self.compute_hash(content)?;
417        let last_hash: Option<String> = self
418            .conn
419            .query_row(
420                "SELECT content_hash FROM platform_status WHERE slug = ?1 AND platform = ?2",
421                params![slug, platform],
422                |row| row.get(0),
423            )
424            .optional()?;
425
426        Ok(last_hash.as_deref() != Some(current_hash.as_str()))
427    }
428
429    /// Compute a simple hash of content for change detection
430    fn compute_hash(&self, content: &Content) -> Result<String> {
431        use std::collections::hash_map::DefaultHasher;
432        use std::hash::{Hash, Hasher};
433
434        let content_str = std::fs::read_to_string(&content.content_file)?;
435        let mut tags = content.meta.tags.clone();
436        tags.sort();
437        tags.dedup();
438
439        let mut hasher = DefaultHasher::new();
440        content_str.hash(&mut hasher);
441        content.meta.title.hash(&mut hasher);
442        tags.hash(&mut hasher);
443
444        Ok(format!("{:x}", hasher.finish()))
445    }
446
447    // =========================================================================
448    // Asset Upload Tracking per [[RFC-0004:C-UPLOAD-TRACKING]]
449    // =========================================================================
450
451    /// Look up cached asset URL by content index key.
452    /// Per [[RFC-0004:C-UPLOAD-TRACKING]] - content index lookup.
453    pub fn get_asset_by_content(
454        &self,
455        storage_config_id: &str,
456        content_hash: &str,
457        extension: &str,
458    ) -> Result<Option<AssetUploadRecord>> {
459        self.conn
460            .query_row(
461                r#"
462SELECT local_path, storage_config_id, content_hash, extension, remote_key, remote_url, uploaded_at
463FROM asset_uploads
464WHERE storage_config_id = ?1 AND content_hash = ?2 AND extension = ?3
465"#,
466                params![storage_config_id, content_hash, extension],
467                |row| {
468                    Ok(AssetUploadRecord {
469                        local_path: row.get(0)?,
470                        storage_config_id: row.get(1)?,
471                        content_hash: row.get(2)?,
472                        extension: row.get(3)?,
473                        remote_key: row.get(4)?,
474                        remote_url: row.get(5)?,
475                        uploaded_at: row.get(6)?,
476                    })
477                },
478            )
479            .optional()
480            .map_err(Into::into)
481    }
482
483    /// Look up cached asset by local path.
484    /// Per [[RFC-0004:C-UPLOAD-TRACKING]] - path index lookup.
485    pub fn get_asset_by_path(
486        &self,
487        local_path: &str,
488        storage_config_id: &str,
489    ) -> Result<Option<AssetUploadRecord>> {
490        self.conn
491            .query_row(
492                r#"
493SELECT local_path, storage_config_id, content_hash, extension, remote_key, remote_url, uploaded_at
494FROM asset_uploads
495WHERE local_path = ?1 AND storage_config_id = ?2
496ORDER BY uploaded_at DESC
497LIMIT 1
498"#,
499                params![local_path, storage_config_id],
500                |row| {
501                    Ok(AssetUploadRecord {
502                        local_path: row.get(0)?,
503                        storage_config_id: row.get(1)?,
504                        content_hash: row.get(2)?,
505                        extension: row.get(3)?,
506                        remote_key: row.get(4)?,
507                        remote_url: row.get(5)?,
508                        uploaded_at: row.get(6)?,
509                    })
510                },
511            )
512            .optional()
513            .map_err(Into::into)
514    }
515
516    /// Record a successful asset upload.
517    /// Per [[RFC-0004:C-UPLOAD-TRACKING]] - per-asset atomic persistence.
518    pub fn record_asset_upload(&self, record: &AssetUploadRecord) -> Result<()> {
519        self.conn.execute(
520            r#"
521INSERT INTO asset_uploads (local_path, storage_config_id, content_hash, extension, remote_key, remote_url, uploaded_at)
522VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
523ON CONFLICT(storage_config_id, content_hash, extension) DO UPDATE SET
524    local_path = excluded.local_path,
525    remote_key = excluded.remote_key,
526    remote_url = excluded.remote_url,
527    uploaded_at = excluded.uploaded_at
528"#,
529            params![
530                record.local_path,
531                record.storage_config_id,
532                record.content_hash,
533                record.extension,
534                record.remote_key,
535                record.remote_url,
536                record.uploaded_at
537            ],
538        )?;
539        Ok(())
540    }
541
542    /// List all asset uploads for assets under a given path prefix.
543    /// Per [[RFC-0004:C-UPLOAD-TRACKING]].
544    pub fn list_assets_by_prefix(&self, path_prefix: &str) -> Result<Vec<AssetUploadRecord>> {
545        let mut stmt = self.conn.prepare(
546            r#"
547SELECT local_path, storage_config_id, content_hash, extension, remote_key, remote_url, uploaded_at
548FROM asset_uploads
549WHERE local_path LIKE ?1 || '%'
550ORDER BY uploaded_at DESC
551"#,
552        )?;
553
554        let records = stmt
555            .query_map(params![path_prefix], |row| {
556                Ok(AssetUploadRecord {
557                    local_path: row.get(0)?,
558                    storage_config_id: row.get(1)?,
559                    content_hash: row.get(2)?,
560                    extension: row.get(3)?,
561                    remote_key: row.get(4)?,
562                    remote_url: row.get(5)?,
563                    uploaded_at: row.get(6)?,
564                })
565            })?
566            .collect::<std::result::Result<Vec<_>, _>>()?;
567
568        Ok(records)
569    }
570}
571
572fn normalize_to_relative(path: &Path, project_root: &Path) -> Result<String> {
573    let normalized = typub_config::project::normalize_to_relative(path, project_root)?;
574    Ok(normalized
575        .strip_prefix("./")
576        .unwrap_or(&normalized)
577        .to_string())
578}