Skip to main content

plexus_substrate/activations/changelog/
activation.rs

1use super::storage::{ChangelogStorage, ChangelogStorageConfig};
2use super::types::{ChangelogEntry, ChangelogEvent, QueueEntry};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use std::sync::Arc;
7
8/// Changelog activation - tracks Plexus RPC server hash changes and enforces documentation
9#[derive(Clone)]
10pub struct Changelog {
11    storage: Arc<ChangelogStorage>,
12}
13
14impl Changelog {
15    pub async fn new(config: ChangelogStorageConfig) -> Result<Self, String> {
16        let storage = ChangelogStorage::new(config).await?;
17        Ok(Self {
18            storage: Arc::new(storage),
19        })
20    }
21
22    /// Run startup check - called when Plexus RPC server starts
23    /// Returns (hash_changed, is_documented, message)
24    pub async fn startup_check(&self, current_hash: &str) -> Result<(bool, bool, String), String> {
25        let previous_hash = self.storage.get_last_hash().await?;
26
27        // Update the stored hash to current
28        self.storage.set_last_hash(current_hash).await?;
29
30        match previous_hash {
31            None => {
32                // First run - no previous hash
33                Ok((false, true, "First startup - no previous hash recorded".to_string()))
34            }
35            Some(prev) if prev == current_hash => {
36                // No change
37                Ok((false, true, "Plexus hash unchanged".to_string()))
38            }
39            Some(prev) => {
40                // Hash changed - check if documented
41                let is_documented = self.storage.is_documented(current_hash).await?;
42                let message = if is_documented {
43                    let entry = self.storage.get_entry(current_hash).await?.unwrap();
44                    format!(
45                        "Plexus changed: {} -> {} (documented: {})",
46                        prev, current_hash, entry.summary
47                    )
48                } else {
49                    format!(
50                        "UNDOCUMENTED PLEXUS CHANGE: {} -> {}. Add changelog entry for hash '{}'",
51                        prev, current_hash, current_hash
52                    )
53                };
54                Ok((true, is_documented, message))
55            }
56        }
57    }
58
59    /// Get the storage for direct access (used by builder for startup check)
60    pub fn storage(&self) -> &ChangelogStorage {
61        &self.storage
62    }
63}
64
65#[hub_methods(
66    namespace = "changelog",
67    version = "1.0.0",
68    description = "Track and document plexus configuration changes"
69)]
70impl Changelog {
71    /// Add a changelog entry for a plexus hash transition
72    #[plexus_macros::hub_method(description = "Add a changelog entry documenting a plexus hash change")]
73    async fn add(
74        &self,
75        hash: String,
76        summary: String,
77        previous_hash: Option<String>,
78        details: Option<Vec<String>>,
79        author: Option<String>,
80        queue_id: Option<String>,
81    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
82        let storage = self.storage.clone();
83
84        stream! {
85            let mut entry = ChangelogEntry::new(hash.clone(), previous_hash, summary);
86            if let Some(d) = details {
87                entry = entry.with_details(d);
88            }
89            if let Some(a) = author {
90                entry = entry.with_author(a);
91            }
92            if let Some(q) = queue_id.clone() {
93                entry = entry.with_queue_id(q);
94            }
95
96            match storage.add_entry(&entry).await {
97                Ok(()) => {
98                    // If this completes a queue item, mark it complete
99                    if let Some(qid) = queue_id {
100                        if let Err(e) = storage.complete_queue_entry(&qid, &hash).await {
101                            tracing::warn!("Failed to complete queue entry {}: {}", qid, e);
102                        }
103                    }
104                    yield ChangelogEvent::EntryAdded { entry };
105                }
106                Err(e) => {
107                    tracing::error!("Failed to add changelog entry: {}", e);
108                }
109            }
110        }
111    }
112
113    /// List all changelog entries
114    #[plexus_macros::hub_method(description = "List all changelog entries (newest first)")]
115    async fn list(&self) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
116        let storage = self.storage.clone();
117
118        stream! {
119            match storage.list_entries().await {
120                Ok(entries) => {
121                    yield ChangelogEvent::Entries { entries };
122                }
123                Err(e) => {
124                    tracing::error!("Failed to list changelog entries: {}", e);
125                }
126            }
127        }
128    }
129
130    /// Get a specific changelog entry by hash
131    #[plexus_macros::hub_method(description = "Get a changelog entry for a specific plexus hash")]
132    async fn get(
133        &self,
134        hash: String,
135    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
136        let storage = self.storage.clone();
137
138        stream! {
139            match storage.get_entry(&hash).await {
140                Ok(entry) => {
141                    let is_documented = entry.is_some();
142                    let previous_hash = storage.get_last_hash().await.ok().flatten();
143                    yield ChangelogEvent::Status {
144                        current_hash: hash,
145                        previous_hash,
146                        is_documented,
147                        entry,
148                    };
149                }
150                Err(e) => {
151                    tracing::error!("Failed to get changelog entry: {}", e);
152                }
153            }
154        }
155    }
156
157    /// Check current status - is the current plexus hash documented?
158    #[plexus_macros::hub_method(description = "Check if the current plexus configuration is documented")]
159    async fn check(
160        &self,
161        current_hash: String,
162    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
163        let storage = self.storage.clone();
164
165        stream! {
166            let previous_hash = storage.get_last_hash().await.ok().flatten();
167            let hash_changed = previous_hash.as_ref().map(|p| p != &current_hash).unwrap_or(true);
168            let is_documented = storage.is_documented(&current_hash).await.unwrap_or(false);
169
170            let message = if !hash_changed {
171                "Plexus hash unchanged".to_string()
172            } else if is_documented {
173                "Plexus change is documented".to_string()
174            } else {
175                format!("UNDOCUMENTED: Add changelog entry for hash '{}'", current_hash)
176            };
177
178            yield ChangelogEvent::StartupCheck {
179                current_hash,
180                previous_hash,
181                hash_changed,
182                is_documented,
183                message,
184            };
185        }
186    }
187
188    // ========== Queue Methods ==========
189
190    /// Add a planned change to the queue
191    #[plexus_macros::hub_method(description = "Queue a planned change that systems should implement. Tags identify which systems are affected (e.g., 'frontend', 'api', 'breaking')")]
192    async fn queue_add(
193        &self,
194        description: String,
195        tags: Option<Vec<String>>,
196    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
197        let storage = self.storage.clone();
198
199        stream! {
200            let id = uuid::Uuid::new_v4().to_string();
201            let entry = QueueEntry::new(id, description, tags.unwrap_or_default());
202
203            match storage.add_queue_entry(&entry).await {
204                Ok(()) => {
205                    yield ChangelogEvent::QueueAdded { entry };
206                }
207                Err(e) => {
208                    tracing::error!("Failed to add queue entry: {}", e);
209                }
210            }
211        }
212    }
213
214    /// List all queue entries, optionally filtered by tag
215    #[plexus_macros::hub_method(description = "List all queued changes, optionally filtered by tag")]
216    async fn queue_list(
217        &self,
218        tag: Option<String>,
219    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
220        let storage = self.storage.clone();
221
222        stream! {
223            match storage.list_queue_entries(tag.as_deref()).await {
224                Ok(entries) => {
225                    yield ChangelogEvent::QueueEntries { entries };
226                }
227                Err(e) => {
228                    tracing::error!("Failed to list queue entries: {}", e);
229                }
230            }
231        }
232    }
233
234    /// List pending queue entries, optionally filtered by tag
235    #[plexus_macros::hub_method(description = "List pending queued changes that haven't been completed yet")]
236    async fn queue_pending(
237        &self,
238        tag: Option<String>,
239    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
240        let storage = self.storage.clone();
241
242        stream! {
243            match storage.list_pending_queue_entries(tag.as_deref()).await {
244                Ok(entries) => {
245                    yield ChangelogEvent::QueueEntries { entries };
246                }
247                Err(e) => {
248                    tracing::error!("Failed to list pending queue entries: {}", e);
249                }
250            }
251        }
252    }
253
254    /// Get a specific queue entry by ID
255    #[plexus_macros::hub_method(description = "Get a specific queued change by its ID")]
256    async fn queue_get(
257        &self,
258        id: String,
259    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
260        let storage = self.storage.clone();
261
262        stream! {
263            match storage.get_queue_entry(&id).await {
264                Ok(entry) => {
265                    yield ChangelogEvent::QueueItem { entry };
266                }
267                Err(e) => {
268                    tracing::error!("Failed to get queue entry: {}", e);
269                }
270            }
271        }
272    }
273
274    /// Mark a queue entry as complete
275    #[plexus_macros::hub_method(description = "Mark a queued change as complete, linking it to the hash where it was implemented")]
276    async fn queue_complete(
277        &self,
278        id: String,
279        hash: String,
280    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
281        let storage = self.storage.clone();
282
283        stream! {
284            match storage.complete_queue_entry(&id, &hash).await {
285                Ok(Some(entry)) => {
286                    yield ChangelogEvent::QueueUpdated { entry };
287                }
288                Ok(None) => {
289                    tracing::warn!("Queue entry not found: {}", id);
290                }
291                Err(e) => {
292                    tracing::error!("Failed to complete queue entry: {}", e);
293                }
294            }
295        }
296    }
297}