Skip to main content

cloudillo_action/
handler.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4use axum::{
5	extract::{ConnectInfo, Path, Query, State},
6	http::StatusCode,
7	Json,
8};
9use serde::{Deserialize, Serialize};
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use cloudillo_types::hasher::hash;
14use cloudillo_types::utils::decode_jwt_no_verify;
15
16use cloudillo_core::{
17	extract::{Auth, OptionalAuth, OptionalRequestId},
18	rate_limit::RateLimitApi,
19	IdTag,
20};
21use cloudillo_types::auth_adapter::ActionToken;
22use cloudillo_types::meta_adapter;
23use cloudillo_types::types::{self, ApiResponse};
24
25use crate::{
26	dsl::DslEngine,
27	filter::filter_actions_by_visibility,
28	prelude::*,
29	task::{self, ActionVerifierTask, CreateAction},
30};
31
32pub async fn list_actions(
33	State(app): State<App>,
34	tn_id: TnId,
35	IdTag(tenant_id_tag): IdTag,
36	OptionalAuth(maybe_auth): OptionalAuth,
37	OptionalRequestId(req_id): OptionalRequestId,
38	Query(mut opts): Query<meta_adapter::ListActionOptions>,
39) -> ClResult<(StatusCode, Json<ApiResponse<Vec<meta_adapter::ActionView>>>)> {
40	// Filter actions by visibility based on subject's access level
41	let (subject_id_tag, is_authenticated) = match &maybe_auth {
42		Some(auth) => (auth.id_tag.as_ref(), true),
43		None => ("", false),
44	};
45
46	// Set viewer_id_tag for involved filter (conversation filtering)
47	if is_authenticated {
48		opts.viewer_id_tag = Some(subject_id_tag.to_string());
49	}
50
51	let limit = opts.limit.unwrap_or(20) as usize;
52	let sort_field = opts.sort.as_deref().unwrap_or("created");
53
54	let actions = app.meta_adapter.list_actions(tn_id, &opts).await?;
55
56	let mut filtered = filter_actions_by_visibility(
57		&app,
58		tn_id,
59		subject_id_tag,
60		is_authenticated,
61		&tenant_id_tag,
62		actions,
63	)
64	.await?;
65
66	// Check if there are more results (we fetched limit+1)
67	let has_more = filtered.len() > limit;
68	if has_more {
69		filtered.truncate(limit);
70	}
71
72	// Build next cursor from last item
73	let next_cursor = if has_more && !filtered.is_empty() {
74		let last = filtered.last().ok_or(Error::Internal("no last item".into()))?;
75		let sort_value = serde_json::Value::Number(last.created_at.0.into());
76		let cursor = types::CursorData::new(sort_field, sort_value, &last.action_id);
77		Some(cursor.encode())
78	} else {
79		None
80	};
81
82	let response = ApiResponse::with_cursor_pagination(filtered, next_cursor, has_more)
83		.with_req_id(req_id.unwrap_or_default());
84
85	Ok((StatusCode::OK, Json(response)))
86}
87
88#[axum::debug_handler]
89pub async fn post_action(
90	State(app): State<App>,
91	tn_id: TnId,
92	IdTag(id_tag): IdTag,
93	Auth(auth): Auth,
94	OptionalRequestId(req_id): OptionalRequestId,
95	Json(action): Json<CreateAction>,
96) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
97	// Defense-in-depth: apkg:publish scoped keys can only create APKG actions
98	if let Some(ref scope) = auth.scope {
99		if scope.as_ref() == "apkg:publish" && action.typ.as_ref() != "APKG" {
100			return Err(Error::PermissionDenied);
101		}
102	}
103
104	let action_id = task::create_action(&app, tn_id, &id_tag, action).await?;
105	debug!("actionId {:?}", &action_id);
106
107	let list = app
108		.meta_adapter
109		.list_actions(
110			tn_id,
111			&meta_adapter::ListActionOptions {
112				action_id: Some(action_id.to_string()),
113				..Default::default()
114			},
115		)
116		.await?;
117	if list.len() != 1 {
118		return Err(Error::NotFound);
119	}
120
121	let mut response = ApiResponse::new(list[0].clone());
122	if let Some(id) = req_id {
123		response = response.with_req_id(id);
124	}
125
126	Ok((StatusCode::CREATED, Json(response)))
127}
128
129#[derive(Debug, Deserialize)]
130pub struct Inbox {
131	token: String,
132	related: Option<Vec<String>>,
133}
134
135/// Request structure for synchronous action processing (e.g., IDP:REG)
136#[derive(Debug, Serialize, Deserialize)]
137pub struct SyncActionRequest {
138	/// Action type (e.g., "IDP:REG")
139	pub r#type: String,
140	/// Optional subtype for action variants
141	pub subtype: Option<String>,
142	/// Issuer ID tag (who is sending this action)
143	pub issuer: String,
144	/// Target audience (who should receive this action)
145	pub audience: Option<String>,
146	/// Action content (structure depends on action type)
147	pub content: serde_json::Value,
148	/// Optional parent action ID (for threading)
149	pub parent: Option<String>,
150	/// Optional subject
151	pub subject: Option<String>,
152	/// Optional attachments
153	pub attachments: Option<Vec<String>>,
154}
155
156#[axum::debug_handler]
157pub async fn post_inbox(
158	State(app): State<App>,
159	tn_id: TnId,
160	ConnectInfo(addr): ConnectInfo<SocketAddr>,
161	OptionalRequestId(req_id): OptionalRequestId,
162	Json(inbox): Json<Inbox>,
163) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
164	// Pre-decode to check action type for PoW requirement
165	// This check happens here so the error is returned synchronously to the client
166	if let Ok(action_preview) = decode_jwt_no_verify::<ActionToken>(&inbox.token) {
167		if action_preview.t.starts_with("CONN") {
168			// Check PoW requirement for CONN actions
169			if let Err(pow_err) = app.rate_limiter.verify_pow(&addr.ip(), &inbox.token) {
170				debug!("CONN action from {} requires PoW: {:?}", action_preview.iss, pow_err);
171				return Err(Error::PreconditionRequired(format!(
172					"Proof of work required: {}",
173					pow_err
174				)));
175			}
176		}
177	}
178
179	let action_id = hash("a", inbox.token.as_bytes());
180
181	// Pass client address for rate limiting integration
182	let client_address: Option<Box<str>> = Some(addr.ip().to_string().into());
183
184	// Store related actions first (they wait for APRV verification before being processed)
185	// Related actions are stored with ack_token pointing to the main action
186	// They will be processed AFTER the main action (APRV) is verified
187	if let Some(related_tokens) = inbox.related {
188		for related_token in related_tokens {
189			let related_id = hash("a", related_token.as_bytes());
190			debug!(
191				"Storing related action {} (waiting for {} verification)",
192				related_id, action_id
193			);
194
195			// Store the related action token with ack_token linking to the APRV
196			// Status 'W' = waiting for APRV verification
197			// The APRV on_receive hook will process these after verifying the APRV
198			if let Err(e) = app
199				.meta_adapter
200				.create_inbound_action(tn_id, &related_id, &related_token, Some(&action_id))
201				.await
202			{
203				// Ignore duplicate errors - action may already exist
204				debug!("Related action {} storage: {} (may be duplicate)", related_id, e);
205			}
206		}
207	}
208
209	// Process main action (APRV) - its on_receive hook will trigger related action processing
210	let task = ActionVerifierTask::new(tn_id, inbox.token.into(), client_address.clone());
211	let _task_id = app.scheduler.task(task).now().await?;
212
213	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
214
215	Ok((StatusCode::CREATED, Json(response)))
216}
217
218/// POST /api/inbox/sync - Synchronously process incoming action (e.g., IDP:REG)
219///
220/// This endpoint processes certain action types synchronously and returns the hook's response.
221/// Used for action types like IDP:REG that need immediate feedback.
222/// Uses token-based authentication like /inbox but processes synchronously and returns the hook result.
223#[axum::debug_handler]
224pub async fn post_inbox_sync(
225	State(app): State<App>,
226	tn_id: TnId,
227	IdTag(_id_tag): IdTag,
228	ConnectInfo(socket_addr): ConnectInfo<std::net::SocketAddr>,
229	OptionalRequestId(req_id): OptionalRequestId,
230	Json(inbox): Json<Inbox>,
231) -> ClResult<(StatusCode, Json<ApiResponse<serde_json::Value>>)> {
232	use crate::process::process_inbound_action_token;
233
234	debug!("POST /api/inbox/sync - Processing synchronous action");
235
236	// Pre-decode to check action type for PoW requirement (same as post_inbox)
237	if let Ok(action_preview) = decode_jwt_no_verify::<ActionToken>(&inbox.token) {
238		if action_preview.t.starts_with("CONN") {
239			if let Err(pow_err) = app.rate_limiter.verify_pow(&socket_addr.ip(), &inbox.token) {
240				debug!("CONN action from {} requires PoW: {:?}", action_preview.iss, pow_err);
241				return Err(Error::PreconditionRequired(format!(
242					"Proof of work required: {}",
243					pow_err
244				)));
245			}
246		}
247	}
248
249	// Create action ID from token hash
250	let action_id_box = hash("a", inbox.token.as_bytes());
251	let action_id = action_id_box.to_string();
252
253	// Extract client IP address for hooks that need it (e.g., IDP:REG with "auto" address)
254	let client_address = Some(socket_addr.ip().to_string());
255
256	// Process the action synchronously and get the hook result
257	let hook_result =
258		process_inbound_action_token(&app, tn_id, &action_id, &inbox.token, true, client_address)
259			.await
260			.map_err(|e| {
261				warn!(error = %e, "Failed to process synchronous action");
262				e
263			})?;
264
265	// Extract the return value from the hook result (or empty object if no return value)
266	let response_data = hook_result.unwrap_or(serde_json::json!({}));
267
268	debug!("POST /api/inbox/sync - Synchronous action {} processed successfully", action_id);
269
270	let response = ApiResponse::new(response_data).with_req_id(req_id.unwrap_or_default());
271
272	Ok((StatusCode::CREATED, Json(response)))
273}
274
275/// GET /api/actions/:action_id - Get a single action
276pub async fn get_action_by_id(
277	State(app): State<App>,
278	tn_id: TnId,
279	Path(action_id): Path<String>,
280	OptionalRequestId(req_id): OptionalRequestId,
281) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
282	let action = app.meta_adapter.get_action(tn_id, &action_id).await?;
283
284	match action {
285		Some(a) => {
286			let response = ApiResponse::new(a).with_req_id(req_id.unwrap_or_default());
287			Ok((StatusCode::OK, Json(response)))
288		}
289		None => Err(Error::NotFound),
290	}
291}
292
293/// DELETE /api/actions/:action_id - Delete action
294pub async fn delete_action(
295	State(app): State<App>,
296	tn_id: TnId,
297	Path(action_id): Path<String>,
298	OptionalRequestId(req_id): OptionalRequestId,
299) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
300	app.meta_adapter.delete_action(tn_id, &action_id).await?;
301	info!("Deleted action {}", action_id);
302
303	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
304
305	Ok((StatusCode::NO_CONTENT, Json(response)))
306}
307
308/// POST /api/actions/:action_id/accept - Accept an action
309pub async fn post_action_accept(
310	State(app): State<App>,
311	tn_id: TnId,
312	Auth(auth): Auth,
313	IdTag(id_tag): IdTag,
314	Path(action_id): Path<String>,
315	OptionalRequestId(req_id): OptionalRequestId,
316) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
317	info!("User {} accepting action {}", auth.id_tag, action_id);
318
319	// Fetch the action from database
320	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
321
322	// Verify the caller is the action's audience (or the tenant owner)
323	if let Some(ref aud) = action.audience {
324		if aud.id_tag.as_ref() != auth.id_tag.as_ref() && id_tag.as_ref() != auth.id_tag.as_ref() {
325			return Err(Error::PermissionDenied);
326		}
327	}
328
329	// Execute DSL on_accept hook if action type has one
330	let dsl = app.ext::<Arc<DslEngine>>()?;
331	if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
332		use crate::hooks::{HookContext, HookType};
333
334		let hook_context = HookContext::builder()
335			.action_id(&*action.action_id)
336			.action_type(&*action.typ)
337			.subtype(action.sub_typ.clone().map(|s| s.to_string()))
338			.issuer(&*action.issuer.id_tag)
339			.audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
340			.parent(action.parent_id.clone().map(|s| s.to_string()))
341			.subject(action.subject.clone().map(|s| s.to_string()))
342			.content(action.content.clone())
343			.attachments(
344				action
345					.attachments
346					.clone()
347					.map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
348			)
349			.created_at(format!("{}", action.created_at.0))
350			.expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
351			.tenant(i64::from(tn_id.0), &*id_tag, "person")
352			.inbound()
353			.build();
354
355		if let Err(e) =
356			dsl.execute_hook(&app, &resolved_type, HookType::OnAccept, hook_context).await
357		{
358			warn!(
359				action_id = %action_id,
360				action_type = %action.typ,
361				user = %auth.id_tag,
362				tenant_id = %tn_id.0,
363				error = %e,
364				"DSL on_accept hook failed"
365			);
366			// Don't fail the request if hook fails - log and continue
367		}
368	}
369
370	// Update action status to 'A' (Accepted)
371	let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
372		status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
373		..Default::default()
374	};
375	app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
376
377	// If action type is approvable, create APRV action to signal approval to the issuer
378	let is_approvable = dsl
379		.get_definition(&action.typ)
380		.is_some_and(|d| d.behavior.approvable.unwrap_or(false));
381
382	if is_approvable {
383		// Create APRV action with:
384		// - audience = action.issuer_tag (original sender receives the approval)
385		// - subject = action_id (the action being approved)
386		// - visibility = 'F' so APRV broadcasts to our followers
387		let aprv_action = CreateAction {
388			typ: "APRV".into(),
389			audience_tag: Some(action.issuer.id_tag.clone()),
390			subject: Some(action_id.clone().into()),
391			visibility: Some('F'),
392			..Default::default()
393		};
394
395		match task::create_action(&app, tn_id, &id_tag, aprv_action).await {
396			Ok(_) => {
397				info!(
398					action_id = %action_id,
399					issuer = %action.issuer.id_tag,
400					"APRV action created for accepted action"
401				);
402			}
403			Err(e) => {
404				warn!(
405					action_id = %action_id,
406					error = %e,
407					"Failed to create APRV action for accepted action"
408				);
409				// Don't fail the accept request if APRV creation fails
410			}
411		}
412	}
413
414	info!(
415		action_id = %action_id,
416		action_type = %action.typ,
417		user = %auth.id_tag,
418		"Action accepted"
419	);
420
421	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
422
423	Ok((StatusCode::OK, Json(response)))
424}
425
426/// POST /api/actions/:action_id/reject - Reject an action
427pub async fn post_action_reject(
428	State(app): State<App>,
429	tn_id: TnId,
430	Auth(auth): Auth,
431	IdTag(id_tag): IdTag,
432	Path(action_id): Path<String>,
433	OptionalRequestId(req_id): OptionalRequestId,
434) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
435	info!("User {} rejecting action {}", auth.id_tag, action_id);
436
437	// Fetch the action from database
438	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
439
440	// Verify the caller is the action's audience (or the tenant owner)
441	if let Some(ref aud) = action.audience {
442		if aud.id_tag.as_ref() != auth.id_tag.as_ref() && id_tag.as_ref() != auth.id_tag.as_ref() {
443			return Err(Error::PermissionDenied);
444		}
445	}
446
447	// Execute DSL on_reject hook if action type has one
448	let dsl = app.ext::<Arc<DslEngine>>()?;
449	if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
450		use crate::hooks::{HookContext, HookType};
451
452		let hook_context = HookContext::builder()
453			.action_id(&*action.action_id)
454			.action_type(&*action.typ)
455			.subtype(action.sub_typ.clone().map(|s| s.to_string()))
456			.issuer(&*action.issuer.id_tag)
457			.audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
458			.parent(action.parent_id.clone().map(|s| s.to_string()))
459			.subject(action.subject.clone().map(|s| s.to_string()))
460			.content(action.content.clone())
461			.attachments(
462				action
463					.attachments
464					.clone()
465					.map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
466			)
467			.created_at(format!("{}", action.created_at.0))
468			.expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
469			.tenant(i64::from(tn_id.0), &*id_tag, "person")
470			.inbound()
471			.build();
472
473		if let Err(e) =
474			dsl.execute_hook(&app, &resolved_type, HookType::OnReject, hook_context).await
475		{
476			warn!(
477				action_id = %action_id,
478				action_type = %action.typ,
479				user = %auth.id_tag,
480				tenant_id = %tn_id.0,
481				error = %e,
482				"DSL on_reject hook failed"
483			);
484			// Don't fail the request if hook fails - log and continue
485		}
486	}
487
488	// Update action status to 'D' (Deleted)
489	let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
490		status: cloudillo_types::types::Patch::Value(crate::status::DELETED),
491		..Default::default()
492	};
493	app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
494
495	info!(
496		action_id = %action_id,
497		action_type = %action.typ,
498		user = %auth.id_tag,
499		"Action rejected"
500	);
501
502	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
503
504	Ok((StatusCode::OK, Json(response)))
505}
506
507/// POST /api/actions/:action_id/dismiss - Dismiss a notification
508pub async fn post_action_dismiss(
509	State(app): State<App>,
510	tn_id: TnId,
511	Auth(auth): Auth,
512	Path(action_id): Path<String>,
513	OptionalRequestId(req_id): OptionalRequestId,
514) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
515	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
516
517	match action.status.as_deref().unwrap_or("") {
518		"N" => {
519			let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
520				status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
521				..Default::default()
522			};
523			app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
524		}
525		"C" => {
526			return Err(Error::ValidationError(
527				"Cannot dismiss confirmation actions. Use accept or reject.".into(),
528			));
529		}
530		_ => { /* Already 'A' or 'D' — idempotent no-op */ }
531	}
532
533	info!(
534		action_id = %action_id,
535		user = %auth.id_tag,
536		"Action dismissed"
537	);
538
539	Ok((StatusCode::OK, Json(ApiResponse::new(()).with_req_id(req_id.unwrap_or_default()))))
540}
541
542/// POST /api/actions/:action_id/stat - Update action statistics
543#[derive(Debug, Default, Deserialize)]
544pub struct UpdateActionStatRequest {
545	#[serde(default, rename = "commentsRead")]
546	pub comments_read: cloudillo_types::types::Patch<u32>,
547}
548
549pub async fn post_action_stat(
550	State(app): State<App>,
551	tn_id: TnId,
552	Auth(auth): Auth,
553	Path(action_id): Path<String>,
554	OptionalRequestId(req_id): OptionalRequestId,
555	Json(req): Json<UpdateActionStatRequest>,
556) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
557	// Update action statistics
558	let opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
559		comments_read: req.comments_read,
560		..Default::default()
561	};
562
563	app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
564
565	info!("User {} updated stats for action {}", auth.id_tag, action_id);
566
567	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
568
569	Ok((StatusCode::OK, Json(response)))
570}
571
572/// Request body for PATCH /api/actions/:action_id (draft update)
573#[derive(Debug, Default, Deserialize)]
574pub struct PatchActionRequest {
575	pub content: Option<serde_json::Value>,
576	pub attachments: Option<Vec<Box<str>>>,
577	pub visibility: Option<char>,
578	pub flags: Option<Box<str>>,
579	pub x: Option<serde_json::Value>,
580}
581
582/// PATCH /api/actions/:action_id - Update a draft action
583pub async fn patch_action(
584	State(app): State<App>,
585	tn_id: TnId,
586	Auth(auth): Auth,
587	IdTag(_id_tag): IdTag,
588	Path(action_id): Path<String>,
589	OptionalRequestId(req_id): OptionalRequestId,
590	Json(req): Json<PatchActionRequest>,
591) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
592	// Only drafts can be updated
593	if !action_id.starts_with('@') {
594		return Err(Error::ValidationError("Only draft actions can be updated".into()));
595	}
596
597	// Verify the action exists and is a draft/scheduled owned by this user
598	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
599	if !matches!(action.status.as_deref(), Some("R" | "S")) {
600		return Err(Error::ValidationError("Only draft actions can be updated".into()));
601	}
602	if action.issuer.id_tag.as_ref() != auth.id_tag.as_ref() {
603		return Err(Error::PermissionDenied);
604	}
605
606	// Build update options
607	let content_str = req.content.as_ref().and_then(|v| serde_json::to_string(v).ok());
608	let attachments_str = req
609		.attachments
610		.as_ref()
611		.map(|a| a.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(","));
612
613	let opts = meta_adapter::UpdateActionDataOptions {
614		content: match content_str {
615			Some(s) => cloudillo_types::types::Patch::Value(s),
616			None => cloudillo_types::types::Patch::Undefined,
617		},
618		attachments: match attachments_str {
619			Some(s) => cloudillo_types::types::Patch::Value(s),
620			None => cloudillo_types::types::Patch::Undefined,
621		},
622		visibility: match req.visibility {
623			Some(v) => cloudillo_types::types::Patch::Value(v),
624			None => cloudillo_types::types::Patch::Undefined,
625		},
626		flags: match req.flags {
627			Some(ref f) => cloudillo_types::types::Patch::Value(f.to_string()),
628			None => cloudillo_types::types::Patch::Undefined,
629		},
630		x: match req.x {
631			Some(ref v) => cloudillo_types::types::Patch::Value(v.clone()),
632			None => cloudillo_types::types::Patch::Undefined,
633		},
634		..Default::default()
635	};
636
637	app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
638
639	// Re-fetch the updated action
640	let updated = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
641
642	let response = ApiResponse::new(updated).with_req_id(req_id.unwrap_or_default());
643	Ok((StatusCode::OK, Json(response)))
644}
645
646/// Request body for POST /api/actions/:action_id/publish
647#[derive(Debug, Default, Deserialize)]
648pub struct PublishDraftRequest {
649	/// Optional scheduled publish time. If set, the draft will be published at this time.
650	#[serde(rename = "publishAt")]
651	pub publish_at: Option<cloudillo_types::types::Timestamp>,
652}
653
654/// POST /api/actions/:action_id/publish - Publish a draft action
655pub async fn publish_draft(
656	State(app): State<App>,
657	tn_id: TnId,
658	Auth(auth): Auth,
659	IdTag(_id_tag): IdTag,
660	Path(action_id): Path<String>,
661	OptionalRequestId(req_id): OptionalRequestId,
662	Json(req): Json<PublishDraftRequest>,
663) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
664	// Only drafts can be published
665	if !action_id.starts_with('@') {
666		return Err(Error::ValidationError("Only draft actions can be published".into()));
667	}
668
669	// Verify the action exists and is a draft/scheduled owned by this user
670	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
671	if !matches!(action.status.as_deref(), Some("R" | "S")) {
672		return Err(Error::ValidationError("Only draft actions can be published".into()));
673	}
674	if action.issuer.id_tag.as_ref() != auth.id_tag.as_ref() {
675		return Err(Error::PermissionDenied);
676	}
677
678	// Parse a_id from @{a_id}
679	let a_id: u64 = action_id
680		.strip_prefix('@')
681		.ok_or(Error::NotFound)?
682		.parse()
683		.map_err(|_| Error::NotFound)?;
684
685	// Reconstruct CreateAction from the stored draft data
686	let draft_action = task::CreateAction {
687		typ: action.typ.clone(),
688		sub_typ: action.sub_typ.clone(),
689		parent_id: action.parent_id.clone(),
690		audience_tag: action.audience.as_ref().map(|a| a.id_tag.clone()),
691		content: action.content.clone(),
692		attachments: action
693			.attachments
694			.as_ref()
695			.map(|a| a.iter().map(|av| av.file_id.clone()).collect()),
696		subject: action.subject.clone(),
697		expires_at: action.expires_at,
698		visibility: action.visibility,
699		flags: action.flags.clone(),
700		x: action.x.clone(),
701		draft: None,
702		publish_at: None,
703	};
704
705	if let Some(publish_at) = req.publish_at {
706		// Scheduled publish: set status to 'S', update created_at, schedule DraftPublishTask
707		// Different scheduled_at ensures scheduler replaces the old task on reschedule
708		let opts = meta_adapter::UpdateActionDataOptions {
709			status: cloudillo_types::types::Patch::Value('S'),
710			created_at: cloudillo_types::types::Patch::Value(publish_at),
711			..Default::default()
712		};
713		app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
714
715		let publish_task =
716			task::DraftPublishTask::new(tn_id, auth.id_tag.clone(), a_id, draft_action, publish_at);
717		app.scheduler
718			.task(publish_task)
719			.key(format!("draft:{},{}", tn_id, a_id))
720			.at(publish_at)
721			.await?;
722	} else {
723		// Immediate publish: set status to 'P', update created_at to now, schedule ActionCreatorTask
724		// Old DraftPublishTask (if any) will no-op since status is no longer 'S'
725		let now = cloudillo_types::types::Timestamp::now();
726		let opts = meta_adapter::UpdateActionDataOptions {
727			status: cloudillo_types::types::Patch::Value('P'),
728			created_at: cloudillo_types::types::Patch::Value(now),
729			..Default::default()
730		};
731		app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
732
733		let creator_task =
734			task::ActionCreatorTask::new(tn_id, auth.id_tag.clone(), a_id, draft_action);
735		app.scheduler
736			.task(creator_task)
737			.key(format!("{},{}", tn_id, a_id))
738			.schedule()
739			.await?;
740	}
741
742	// Re-fetch the action
743	let updated = app
744		.meta_adapter
745		.list_actions(
746			tn_id,
747			&meta_adapter::ListActionOptions { action_id: Some(action_id), ..Default::default() },
748		)
749		.await?;
750	let result = updated.into_iter().next().ok_or(Error::NotFound)?;
751
752	let response = ApiResponse::new(result).with_req_id(req_id.unwrap_or_default());
753	Ok((StatusCode::OK, Json(response)))
754}
755
756/// POST /api/actions/:action_id/cancel - Cancel a scheduled draft (back to draft status)
757pub async fn cancel_scheduled(
758	State(app): State<App>,
759	tn_id: TnId,
760	Auth(auth): Auth,
761	IdTag(_id_tag): IdTag,
762	Path(action_id): Path<String>,
763	OptionalRequestId(req_id): OptionalRequestId,
764) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
765	// Only drafts can be cancelled
766	if !action_id.starts_with('@') {
767		return Err(Error::ValidationError("Only draft actions can be cancelled".into()));
768	}
769
770	// Verify the action exists and is scheduled, owned by this user
771	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
772	if action.status.as_deref() != Some("S") {
773		return Err(Error::ValidationError("Only scheduled drafts can be cancelled".into()));
774	}
775	if action.issuer.id_tag.as_ref() != auth.id_tag.as_ref() {
776		return Err(Error::PermissionDenied);
777	}
778
779	// Transition status from 'S' (scheduled) back to 'R' (draft)
780	// The DraftPublishTask will no-op when it fires since status is no longer 'S'
781	let opts = meta_adapter::UpdateActionDataOptions {
782		status: cloudillo_types::types::Patch::Value('R'),
783		..Default::default()
784	};
785	app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
786
787	// Re-fetch the updated action
788	let updated = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
789
790	let response = ApiResponse::new(updated).with_req_id(req_id.unwrap_or_default());
791	Ok((StatusCode::OK, Json(response)))
792}
793
794// vim: ts=4