plexus_substrate/activations/changelog/
activation.rs1use super::storage::{ChangelogStorage, ChangelogStorageConfig};
2use super::types::{ChangelogEntry, ChangelogEvent, QueueEntry};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use std::sync::Arc;
7
8#[derive(Clone)]
10pub struct Changelog {
11 storage: Arc<ChangelogStorage>,
12}
13
14impl Changelog {
15 pub async fn new(config: ChangelogStorageConfig) -> Result<Self, String> {
16 let storage = ChangelogStorage::new(config).await?;
17 Ok(Self {
18 storage: Arc::new(storage),
19 })
20 }
21
22 pub async fn startup_check(&self, current_hash: &str) -> Result<(bool, bool, String), String> {
25 let previous_hash = self.storage.get_last_hash().await?;
26
27 self.storage.set_last_hash(current_hash).await?;
29
30 match previous_hash {
31 None => {
32 Ok((false, true, "First startup - no previous hash recorded".to_string()))
34 }
35 Some(prev) if prev == current_hash => {
36 Ok((false, true, "Plexus hash unchanged".to_string()))
38 }
39 Some(prev) => {
40 let is_documented = self.storage.is_documented(current_hash).await?;
42 let message = if is_documented {
43 let entry = self.storage.get_entry(current_hash).await?.unwrap();
44 format!(
45 "Plexus changed: {} -> {} (documented: {})",
46 prev, current_hash, entry.summary
47 )
48 } else {
49 format!(
50 "UNDOCUMENTED PLEXUS CHANGE: {} -> {}. Add changelog entry for hash '{}'",
51 prev, current_hash, current_hash
52 )
53 };
54 Ok((true, is_documented, message))
55 }
56 }
57 }
58
59 pub fn storage(&self) -> &ChangelogStorage {
61 &self.storage
62 }
63}
64
65#[plexus_macros::activation(namespace = "changelog",
66version = "1.0.0",
67description = "Track and document plexus configuration changes", crate_path = "plexus_core")]
68impl Changelog {
69 #[plexus_macros::method(description = "Add a changelog entry documenting a plexus hash change")]
71 async fn add(
72 &self,
73 hash: String,
74 summary: String,
75 previous_hash: Option<String>,
76 details: Option<Vec<String>>,
77 author: Option<String>,
78 queue_id: Option<String>,
79 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
80 let storage = self.storage.clone();
81
82 stream! {
83 let mut entry = ChangelogEntry::new(hash.clone(), previous_hash, summary);
84 if let Some(d) = details {
85 entry = entry.with_details(d);
86 }
87 if let Some(a) = author {
88 entry = entry.with_author(a);
89 }
90 if let Some(q) = queue_id.clone() {
91 entry = entry.with_queue_id(q);
92 }
93
94 match storage.add_entry(&entry).await {
95 Ok(()) => {
96 if let Some(qid) = queue_id {
98 if let Err(e) = storage.complete_queue_entry(&qid, &hash).await {
99 tracing::warn!("Failed to complete queue entry {}: {}", qid, e);
100 }
101 }
102 yield ChangelogEvent::EntryAdded { entry };
103 }
104 Err(e) => {
105 tracing::error!("Failed to add changelog entry: {}", e);
106 }
107 }
108 }
109 }
110
111 #[plexus_macros::method(description = "List all changelog entries (newest first)")]
113 async fn list(&self) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
114 let storage = self.storage.clone();
115
116 stream! {
117 match storage.list_entries().await {
118 Ok(entries) => {
119 yield ChangelogEvent::Entries { entries };
120 }
121 Err(e) => {
122 tracing::error!("Failed to list changelog entries: {}", e);
123 }
124 }
125 }
126 }
127
128 #[plexus_macros::method(description = "Get a changelog entry for a specific plexus hash")]
130 async fn get(
131 &self,
132 hash: String,
133 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
134 let storage = self.storage.clone();
135
136 stream! {
137 match storage.get_entry(&hash).await {
138 Ok(entry) => {
139 let is_documented = entry.is_some();
140 let previous_hash = storage.get_last_hash().await.ok().flatten();
141 yield ChangelogEvent::Status {
142 current_hash: hash,
143 previous_hash,
144 is_documented,
145 entry,
146 };
147 }
148 Err(e) => {
149 tracing::error!("Failed to get changelog entry: {}", e);
150 }
151 }
152 }
153 }
154
155 #[plexus_macros::method(description = "Check if the current plexus configuration is documented")]
157 async fn check(
158 &self,
159 current_hash: String,
160 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
161 let storage = self.storage.clone();
162
163 stream! {
164 let previous_hash = storage.get_last_hash().await.ok().flatten();
165 let hash_changed = previous_hash.as_ref().map(|p| p != ¤t_hash).unwrap_or(true);
166 let is_documented = storage.is_documented(¤t_hash).await.unwrap_or(false);
167
168 let message = if !hash_changed {
169 "Plexus hash unchanged".to_string()
170 } else if is_documented {
171 "Plexus change is documented".to_string()
172 } else {
173 format!("UNDOCUMENTED: Add changelog entry for hash '{}'", current_hash)
174 };
175
176 yield ChangelogEvent::StartupCheck {
177 current_hash,
178 previous_hash,
179 hash_changed,
180 is_documented,
181 message,
182 };
183 }
184 }
185
186 #[plexus_macros::method(description = "Queue a planned change that systems should implement. Tags identify which systems are affected (e.g., 'frontend', 'api', 'breaking')")]
190 async fn queue_add(
191 &self,
192 description: String,
193 tags: Option<Vec<String>>,
194 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
195 let storage = self.storage.clone();
196
197 stream! {
198 let id = uuid::Uuid::new_v4().to_string();
199 let entry = QueueEntry::new(id, description, tags.unwrap_or_default());
200
201 match storage.add_queue_entry(&entry).await {
202 Ok(()) => {
203 yield ChangelogEvent::QueueAdded { entry };
204 }
205 Err(e) => {
206 tracing::error!("Failed to add queue entry: {}", e);
207 }
208 }
209 }
210 }
211
212 #[plexus_macros::method(description = "List all queued changes, optionally filtered by tag")]
214 async fn queue_list(
215 &self,
216 tag: Option<String>,
217 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
218 let storage = self.storage.clone();
219
220 stream! {
221 match storage.list_queue_entries(tag.as_deref()).await {
222 Ok(entries) => {
223 yield ChangelogEvent::QueueEntries { entries };
224 }
225 Err(e) => {
226 tracing::error!("Failed to list queue entries: {}", e);
227 }
228 }
229 }
230 }
231
232 #[plexus_macros::method(description = "List pending queued changes that haven't been completed yet")]
234 async fn queue_pending(
235 &self,
236 tag: Option<String>,
237 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
238 let storage = self.storage.clone();
239
240 stream! {
241 match storage.list_pending_queue_entries(tag.as_deref()).await {
242 Ok(entries) => {
243 yield ChangelogEvent::QueueEntries { entries };
244 }
245 Err(e) => {
246 tracing::error!("Failed to list pending queue entries: {}", e);
247 }
248 }
249 }
250 }
251
252 #[plexus_macros::method(description = "Get a specific queued change by its ID")]
254 async fn queue_get(
255 &self,
256 id: String,
257 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
258 let storage = self.storage.clone();
259
260 stream! {
261 match storage.get_queue_entry(&id).await {
262 Ok(entry) => {
263 yield ChangelogEvent::QueueItem { entry };
264 }
265 Err(e) => {
266 tracing::error!("Failed to get queue entry: {}", e);
267 }
268 }
269 }
270 }
271
272 #[plexus_macros::method(description = "Mark a queued change as complete, linking it to the hash where it was implemented")]
274 async fn queue_complete(
275 &self,
276 id: String,
277 hash: String,
278 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
279 let storage = self.storage.clone();
280
281 stream! {
282 match storage.complete_queue_entry(&id, &hash).await {
283 Ok(Some(entry)) => {
284 yield ChangelogEvent::QueueUpdated { entry };
285 }
286 Ok(None) => {
287 tracing::warn!("Queue entry not found: {}", id);
288 }
289 Err(e) => {
290 tracing::error!("Failed to complete queue entry: {}", e);
291 }
292 }
293 }
294 }
295}