Skip to main content

plexus_substrate/activations/changelog/
activation.rs

1use super::storage::{ChangelogStorage, ChangelogStorageConfig};
2use super::types::{ChangelogEntry, ChangelogEvent, QueueEntry};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use std::sync::Arc;
7
8/// Changelog activation - tracks Plexus RPC server hash changes and enforces documentation
9#[derive(Clone)]
10pub struct Changelog {
11    storage: Arc<ChangelogStorage>,
12}
13
14impl Changelog {
15    pub async fn new(config: ChangelogStorageConfig) -> Result<Self, String> {
16        let storage = ChangelogStorage::new(config).await?;
17        Ok(Self {
18            storage: Arc::new(storage),
19        })
20    }
21
22    /// Run startup check - called when Plexus RPC server starts
23    /// Returns (hash_changed, is_documented, message)
24    pub async fn startup_check(&self, current_hash: &str) -> Result<(bool, bool, String), String> {
25        let previous_hash = self.storage.get_last_hash().await?;
26
27        // Update the stored hash to current
28        self.storage.set_last_hash(current_hash).await?;
29
30        match previous_hash {
31            None => {
32                // First run - no previous hash
33                Ok((false, true, "First startup - no previous hash recorded".to_string()))
34            }
35            Some(prev) if prev == current_hash => {
36                // No change
37                Ok((false, true, "Plexus hash unchanged".to_string()))
38            }
39            Some(prev) => {
40                // Hash changed - check if documented
41                let is_documented = self.storage.is_documented(current_hash).await?;
42                let message = if is_documented {
43                    let entry = self.storage.get_entry(current_hash).await?.unwrap();
44                    format!(
45                        "Plexus changed: {} -> {} (documented: {})",
46                        prev, current_hash, entry.summary
47                    )
48                } else {
49                    format!(
50                        "UNDOCUMENTED PLEXUS CHANGE: {} -> {}. Add changelog entry for hash '{}'",
51                        prev, current_hash, current_hash
52                    )
53                };
54                Ok((true, is_documented, message))
55            }
56        }
57    }
58
59    /// Get the storage for direct access (used by builder for startup check)
60    pub fn storage(&self) -> &ChangelogStorage {
61        &self.storage
62    }
63}
64
65#[plexus_macros::activation(namespace = "changelog",
66version = "1.0.0",
67description = "Track and document plexus configuration changes", crate_path = "plexus_core")]
68impl Changelog {
69    /// Add a changelog entry for a plexus hash transition
70    #[plexus_macros::method(description = "Add a changelog entry documenting a plexus hash change")]
71    async fn add(
72        &self,
73        hash: String,
74        summary: String,
75        previous_hash: Option<String>,
76        details: Option<Vec<String>>,
77        author: Option<String>,
78        queue_id: Option<String>,
79    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
80        let storage = self.storage.clone();
81
82        stream! {
83            let mut entry = ChangelogEntry::new(hash.clone(), previous_hash, summary);
84            if let Some(d) = details {
85                entry = entry.with_details(d);
86            }
87            if let Some(a) = author {
88                entry = entry.with_author(a);
89            }
90            if let Some(q) = queue_id.clone() {
91                entry = entry.with_queue_id(q);
92            }
93
94            match storage.add_entry(&entry).await {
95                Ok(()) => {
96                    // If this completes a queue item, mark it complete
97                    if let Some(qid) = queue_id {
98                        if let Err(e) = storage.complete_queue_entry(&qid, &hash).await {
99                            tracing::warn!("Failed to complete queue entry {}: {}", qid, e);
100                        }
101                    }
102                    yield ChangelogEvent::EntryAdded { entry };
103                }
104                Err(e) => {
105                    tracing::error!("Failed to add changelog entry: {}", e);
106                }
107            }
108        }
109    }
110
111    /// List all changelog entries
112    #[plexus_macros::method(description = "List all changelog entries (newest first)")]
113    async fn list(&self) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
114        let storage = self.storage.clone();
115
116        stream! {
117            match storage.list_entries().await {
118                Ok(entries) => {
119                    yield ChangelogEvent::Entries { entries };
120                }
121                Err(e) => {
122                    tracing::error!("Failed to list changelog entries: {}", e);
123                }
124            }
125        }
126    }
127
128    /// Get a specific changelog entry by hash
129    #[plexus_macros::method(description = "Get a changelog entry for a specific plexus hash")]
130    async fn get(
131        &self,
132        hash: String,
133    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
134        let storage = self.storage.clone();
135
136        stream! {
137            match storage.get_entry(&hash).await {
138                Ok(entry) => {
139                    let is_documented = entry.is_some();
140                    let previous_hash = storage.get_last_hash().await.ok().flatten();
141                    yield ChangelogEvent::Status {
142                        current_hash: hash,
143                        previous_hash,
144                        is_documented,
145                        entry,
146                    };
147                }
148                Err(e) => {
149                    tracing::error!("Failed to get changelog entry: {}", e);
150                }
151            }
152        }
153    }
154
155    /// Check current status - is the current plexus hash documented?
156    #[plexus_macros::method(description = "Check if the current plexus configuration is documented")]
157    async fn check(
158        &self,
159        current_hash: String,
160    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
161        let storage = self.storage.clone();
162
163        stream! {
164            let previous_hash = storage.get_last_hash().await.ok().flatten();
165            let hash_changed = previous_hash.as_ref().map(|p| p != &current_hash).unwrap_or(true);
166            let is_documented = storage.is_documented(&current_hash).await.unwrap_or(false);
167
168            let message = if !hash_changed {
169                "Plexus hash unchanged".to_string()
170            } else if is_documented {
171                "Plexus change is documented".to_string()
172            } else {
173                format!("UNDOCUMENTED: Add changelog entry for hash '{}'", current_hash)
174            };
175
176            yield ChangelogEvent::StartupCheck {
177                current_hash,
178                previous_hash,
179                hash_changed,
180                is_documented,
181                message,
182            };
183        }
184    }
185
186    // ========== Queue Methods ==========
187
188    /// Add a planned change to the queue
189    #[plexus_macros::method(description = "Queue a planned change that systems should implement. Tags identify which systems are affected (e.g., 'frontend', 'api', 'breaking')")]
190    async fn queue_add(
191        &self,
192        description: String,
193        tags: Option<Vec<String>>,
194    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
195        let storage = self.storage.clone();
196
197        stream! {
198            let id = uuid::Uuid::new_v4().to_string();
199            let entry = QueueEntry::new(id, description, tags.unwrap_or_default());
200
201            match storage.add_queue_entry(&entry).await {
202                Ok(()) => {
203                    yield ChangelogEvent::QueueAdded { entry };
204                }
205                Err(e) => {
206                    tracing::error!("Failed to add queue entry: {}", e);
207                }
208            }
209        }
210    }
211
212    /// List all queue entries, optionally filtered by tag
213    #[plexus_macros::method(description = "List all queued changes, optionally filtered by tag")]
214    async fn queue_list(
215        &self,
216        tag: Option<String>,
217    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
218        let storage = self.storage.clone();
219
220        stream! {
221            match storage.list_queue_entries(tag.as_deref()).await {
222                Ok(entries) => {
223                    yield ChangelogEvent::QueueEntries { entries };
224                }
225                Err(e) => {
226                    tracing::error!("Failed to list queue entries: {}", e);
227                }
228            }
229        }
230    }
231
232    /// List pending queue entries, optionally filtered by tag
233    #[plexus_macros::method(description = "List pending queued changes that haven't been completed yet")]
234    async fn queue_pending(
235        &self,
236        tag: Option<String>,
237    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
238        let storage = self.storage.clone();
239
240        stream! {
241            match storage.list_pending_queue_entries(tag.as_deref()).await {
242                Ok(entries) => {
243                    yield ChangelogEvent::QueueEntries { entries };
244                }
245                Err(e) => {
246                    tracing::error!("Failed to list pending queue entries: {}", e);
247                }
248            }
249        }
250    }
251
252    /// Get a specific queue entry by ID
253    #[plexus_macros::method(description = "Get a specific queued change by its ID")]
254    async fn queue_get(
255        &self,
256        id: String,
257    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
258        let storage = self.storage.clone();
259
260        stream! {
261            match storage.get_queue_entry(&id).await {
262                Ok(entry) => {
263                    yield ChangelogEvent::QueueItem { entry };
264                }
265                Err(e) => {
266                    tracing::error!("Failed to get queue entry: {}", e);
267                }
268            }
269        }
270    }
271
272    /// Mark a queue entry as complete
273    #[plexus_macros::method(description = "Mark a queued change as complete, linking it to the hash where it was implemented")]
274    async fn queue_complete(
275        &self,
276        id: String,
277        hash: String,
278    ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
279        let storage = self.storage.clone();
280
281        stream! {
282            match storage.complete_queue_entry(&id, &hash).await {
283                Ok(Some(entry)) => {
284                    yield ChangelogEvent::QueueUpdated { entry };
285                }
286                Ok(None) => {
287                    tracing::warn!("Queue entry not found: {}", id);
288                }
289                Err(e) => {
290                    tracing::error!("Failed to complete queue entry: {}", e);
291                }
292            }
293        }
294    }
295}