plexus_substrate/activations/changelog/
activation.rs1use super::storage::{ChangelogStorage, ChangelogStorageConfig};
2use super::types::{ChangelogEntry, ChangelogEvent, QueueEntry};
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use std::sync::Arc;
7
8#[derive(Clone)]
10pub struct Changelog {
11 storage: Arc<ChangelogStorage>,
12}
13
14impl Changelog {
15 pub async fn new(config: ChangelogStorageConfig) -> Result<Self, String> {
16 let storage = ChangelogStorage::new(config).await?;
17 Ok(Self {
18 storage: Arc::new(storage),
19 })
20 }
21
22 pub async fn startup_check(&self, current_hash: &str) -> Result<(bool, bool, String), String> {
25 let previous_hash = self.storage.get_last_hash().await?;
26
27 self.storage.set_last_hash(current_hash).await?;
29
30 match previous_hash {
31 None => {
32 Ok((false, true, "First startup - no previous hash recorded".to_string()))
34 }
35 Some(prev) if prev == current_hash => {
36 Ok((false, true, "Plexus hash unchanged".to_string()))
38 }
39 Some(prev) => {
40 let is_documented = self.storage.is_documented(current_hash).await?;
42 let message = if is_documented {
43 let entry = self.storage.get_entry(current_hash).await?.unwrap();
44 format!(
45 "Plexus changed: {} -> {} (documented: {})",
46 prev, current_hash, entry.summary
47 )
48 } else {
49 format!(
50 "UNDOCUMENTED PLEXUS CHANGE: {} -> {}. Add changelog entry for hash '{}'",
51 prev, current_hash, current_hash
52 )
53 };
54 Ok((true, is_documented, message))
55 }
56 }
57 }
58
59 pub fn storage(&self) -> &ChangelogStorage {
61 &self.storage
62 }
63}
64
65#[hub_methods(
66 namespace = "changelog",
67 version = "1.0.0",
68 description = "Track and document plexus configuration changes"
69)]
70impl Changelog {
71 #[plexus_macros::hub_method(description = "Add a changelog entry documenting a plexus hash change")]
73 async fn add(
74 &self,
75 hash: String,
76 summary: String,
77 previous_hash: Option<String>,
78 details: Option<Vec<String>>,
79 author: Option<String>,
80 queue_id: Option<String>,
81 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
82 let storage = self.storage.clone();
83
84 stream! {
85 let mut entry = ChangelogEntry::new(hash.clone(), previous_hash, summary);
86 if let Some(d) = details {
87 entry = entry.with_details(d);
88 }
89 if let Some(a) = author {
90 entry = entry.with_author(a);
91 }
92 if let Some(q) = queue_id.clone() {
93 entry = entry.with_queue_id(q);
94 }
95
96 match storage.add_entry(&entry).await {
97 Ok(()) => {
98 if let Some(qid) = queue_id {
100 if let Err(e) = storage.complete_queue_entry(&qid, &hash).await {
101 tracing::warn!("Failed to complete queue entry {}: {}", qid, e);
102 }
103 }
104 yield ChangelogEvent::EntryAdded { entry };
105 }
106 Err(e) => {
107 tracing::error!("Failed to add changelog entry: {}", e);
108 }
109 }
110 }
111 }
112
113 #[plexus_macros::hub_method(description = "List all changelog entries (newest first)")]
115 async fn list(&self) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
116 let storage = self.storage.clone();
117
118 stream! {
119 match storage.list_entries().await {
120 Ok(entries) => {
121 yield ChangelogEvent::Entries { entries };
122 }
123 Err(e) => {
124 tracing::error!("Failed to list changelog entries: {}", e);
125 }
126 }
127 }
128 }
129
130 #[plexus_macros::hub_method(description = "Get a changelog entry for a specific plexus hash")]
132 async fn get(
133 &self,
134 hash: String,
135 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
136 let storage = self.storage.clone();
137
138 stream! {
139 match storage.get_entry(&hash).await {
140 Ok(entry) => {
141 let is_documented = entry.is_some();
142 let previous_hash = storage.get_last_hash().await.ok().flatten();
143 yield ChangelogEvent::Status {
144 current_hash: hash,
145 previous_hash,
146 is_documented,
147 entry,
148 };
149 }
150 Err(e) => {
151 tracing::error!("Failed to get changelog entry: {}", e);
152 }
153 }
154 }
155 }
156
157 #[plexus_macros::hub_method(description = "Check if the current plexus configuration is documented")]
159 async fn check(
160 &self,
161 current_hash: String,
162 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
163 let storage = self.storage.clone();
164
165 stream! {
166 let previous_hash = storage.get_last_hash().await.ok().flatten();
167 let hash_changed = previous_hash.as_ref().map(|p| p != ¤t_hash).unwrap_or(true);
168 let is_documented = storage.is_documented(¤t_hash).await.unwrap_or(false);
169
170 let message = if !hash_changed {
171 "Plexus hash unchanged".to_string()
172 } else if is_documented {
173 "Plexus change is documented".to_string()
174 } else {
175 format!("UNDOCUMENTED: Add changelog entry for hash '{}'", current_hash)
176 };
177
178 yield ChangelogEvent::StartupCheck {
179 current_hash,
180 previous_hash,
181 hash_changed,
182 is_documented,
183 message,
184 };
185 }
186 }
187
188 #[plexus_macros::hub_method(description = "Queue a planned change that systems should implement. Tags identify which systems are affected (e.g., 'frontend', 'api', 'breaking')")]
192 async fn queue_add(
193 &self,
194 description: String,
195 tags: Option<Vec<String>>,
196 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
197 let storage = self.storage.clone();
198
199 stream! {
200 let id = uuid::Uuid::new_v4().to_string();
201 let entry = QueueEntry::new(id, description, tags.unwrap_or_default());
202
203 match storage.add_queue_entry(&entry).await {
204 Ok(()) => {
205 yield ChangelogEvent::QueueAdded { entry };
206 }
207 Err(e) => {
208 tracing::error!("Failed to add queue entry: {}", e);
209 }
210 }
211 }
212 }
213
214 #[plexus_macros::hub_method(description = "List all queued changes, optionally filtered by tag")]
216 async fn queue_list(
217 &self,
218 tag: Option<String>,
219 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
220 let storage = self.storage.clone();
221
222 stream! {
223 match storage.list_queue_entries(tag.as_deref()).await {
224 Ok(entries) => {
225 yield ChangelogEvent::QueueEntries { entries };
226 }
227 Err(e) => {
228 tracing::error!("Failed to list queue entries: {}", e);
229 }
230 }
231 }
232 }
233
234 #[plexus_macros::hub_method(description = "List pending queued changes that haven't been completed yet")]
236 async fn queue_pending(
237 &self,
238 tag: Option<String>,
239 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
240 let storage = self.storage.clone();
241
242 stream! {
243 match storage.list_pending_queue_entries(tag.as_deref()).await {
244 Ok(entries) => {
245 yield ChangelogEvent::QueueEntries { entries };
246 }
247 Err(e) => {
248 tracing::error!("Failed to list pending queue entries: {}", e);
249 }
250 }
251 }
252 }
253
254 #[plexus_macros::hub_method(description = "Get a specific queued change by its ID")]
256 async fn queue_get(
257 &self,
258 id: String,
259 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
260 let storage = self.storage.clone();
261
262 stream! {
263 match storage.get_queue_entry(&id).await {
264 Ok(entry) => {
265 yield ChangelogEvent::QueueItem { entry };
266 }
267 Err(e) => {
268 tracing::error!("Failed to get queue entry: {}", e);
269 }
270 }
271 }
272 }
273
274 #[plexus_macros::hub_method(description = "Mark a queued change as complete, linking it to the hash where it was implemented")]
276 async fn queue_complete(
277 &self,
278 id: String,
279 hash: String,
280 ) -> impl Stream<Item = ChangelogEvent> + Send + 'static {
281 let storage = self.storage.clone();
282
283 stream! {
284 match storage.complete_queue_entry(&id, &hash).await {
285 Ok(Some(entry)) => {
286 yield ChangelogEvent::QueueUpdated { entry };
287 }
288 Ok(None) => {
289 tracing::warn!("Queue entry not found: {}", id);
290 }
291 Err(e) => {
292 tracing::error!("Failed to complete queue entry: {}", e);
293 }
294 }
295 }
296 }
297}