Skip to main content

cloudillo_action/
handler.rs

1use axum::{
2	extract::{ConnectInfo, Path, Query, State},
3	http::StatusCode,
4	Json,
5};
6use serde::{Deserialize, Serialize};
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use cloudillo_types::utils::decode_jwt_no_verify;
11
12use cloudillo_core::{
13	extract::{Auth, OptionalAuth, OptionalRequestId},
14	rate_limit::RateLimitApi,
15	IdTag,
16};
17use cloudillo_types::auth_adapter::ActionToken;
18use cloudillo_types::hasher::hash;
19use cloudillo_types::meta_adapter;
20use cloudillo_types::types::{self, ApiResponse};
21
22use crate::{
23	dsl::DslEngine,
24	filter::filter_actions_by_visibility,
25	prelude::*,
26	task::{self, ActionVerifierTask, CreateAction},
27};
28
29pub async fn list_actions(
30	State(app): State<App>,
31	tn_id: TnId,
32	IdTag(tenant_id_tag): IdTag,
33	OptionalAuth(maybe_auth): OptionalAuth,
34	OptionalRequestId(req_id): OptionalRequestId,
35	Query(mut opts): Query<meta_adapter::ListActionOptions>,
36) -> ClResult<(StatusCode, Json<ApiResponse<Vec<meta_adapter::ActionView>>>)> {
37	// Filter actions by visibility based on subject's access level
38	let (subject_id_tag, is_authenticated) = match &maybe_auth {
39		Some(auth) => (auth.id_tag.as_ref(), true),
40		None => ("", false),
41	};
42
43	// Set viewer_id_tag for involved filter (conversation filtering)
44	if is_authenticated {
45		opts.viewer_id_tag = Some(subject_id_tag.to_string());
46	}
47
48	let limit = opts.limit.unwrap_or(20) as usize;
49	let sort_field = opts.sort.as_deref().unwrap_or("created");
50
51	let actions = app.meta_adapter.list_actions(tn_id, &opts).await?;
52
53	let mut filtered = filter_actions_by_visibility(
54		&app,
55		tn_id,
56		subject_id_tag,
57		is_authenticated,
58		&tenant_id_tag,
59		actions,
60	)
61	.await?;
62
63	// Check if there are more results (we fetched limit+1)
64	let has_more = filtered.len() > limit;
65	if has_more {
66		filtered.truncate(limit);
67	}
68
69	// Build next cursor from last item
70	let next_cursor = if has_more && !filtered.is_empty() {
71		let last = filtered.last().ok_or(Error::Internal("no last item".into()))?;
72		let sort_value = serde_json::Value::Number(last.created_at.0.into());
73		let cursor = types::CursorData::new(sort_field, sort_value, &last.action_id);
74		Some(cursor.encode())
75	} else {
76		None
77	};
78
79	let response = ApiResponse::with_cursor_pagination(filtered, next_cursor, has_more)
80		.with_req_id(req_id.unwrap_or_default());
81
82	Ok((StatusCode::OK, Json(response)))
83}
84
85#[axum::debug_handler]
86pub async fn post_action(
87	State(app): State<App>,
88	tn_id: TnId,
89	IdTag(id_tag): IdTag,
90	OptionalRequestId(req_id): OptionalRequestId,
91	Json(action): Json<CreateAction>,
92) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
93	let action_id = task::create_action(&app, tn_id, &id_tag, action).await?;
94	debug!("actionId {:?}", &action_id);
95
96	let list = app
97		.meta_adapter
98		.list_actions(
99			tn_id,
100			&meta_adapter::ListActionOptions {
101				action_id: Some(action_id.to_string()),
102				..Default::default()
103			},
104		)
105		.await?;
106	if list.len() != 1 {
107		return Err(Error::NotFound);
108	}
109
110	let mut response = ApiResponse::new(list[0].clone());
111	if let Some(id) = req_id {
112		response = response.with_req_id(id);
113	}
114
115	Ok((StatusCode::CREATED, Json(response)))
116}
117
118#[derive(Deserialize)]
119pub struct Inbox {
120	token: String,
121	related: Option<Vec<String>>,
122}
123
124/// Request structure for synchronous action processing (e.g., IDP:REG)
125#[derive(Serialize, Deserialize)]
126pub struct SyncActionRequest {
127	/// Action type (e.g., "IDP:REG")
128	pub r#type: String,
129	/// Optional subtype for action variants
130	pub subtype: Option<String>,
131	/// Issuer ID tag (who is sending this action)
132	pub issuer: String,
133	/// Target audience (who should receive this action)
134	pub audience: Option<String>,
135	/// Action content (structure depends on action type)
136	pub content: serde_json::Value,
137	/// Optional parent action ID (for threading)
138	pub parent: Option<String>,
139	/// Optional subject
140	pub subject: Option<String>,
141	/// Optional attachments
142	pub attachments: Option<Vec<String>>,
143}
144
145#[axum::debug_handler]
146pub async fn post_inbox(
147	State(app): State<App>,
148	tn_id: TnId,
149	ConnectInfo(addr): ConnectInfo<SocketAddr>,
150	OptionalRequestId(req_id): OptionalRequestId,
151	Json(inbox): Json<Inbox>,
152) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
153	// Pre-decode to check action type for PoW requirement
154	// This check happens here so the error is returned synchronously to the client
155	if let Ok(action_preview) = decode_jwt_no_verify::<ActionToken>(&inbox.token) {
156		if action_preview.t.starts_with("CONN") {
157			// Check PoW requirement for CONN actions
158			if let Err(pow_err) = app.rate_limiter.verify_pow(&addr.ip(), &inbox.token) {
159				debug!("CONN action from {} requires PoW: {:?}", action_preview.iss, pow_err);
160				return Err(Error::PreconditionRequired(format!(
161					"Proof of work required: {}",
162					pow_err
163				)));
164			}
165		}
166	}
167
168	let action_id = hash("a", inbox.token.as_bytes());
169
170	// Pass client address for rate limiting integration
171	let client_address: Option<Box<str>> = Some(addr.ip().to_string().into());
172
173	// Store related actions first (they wait for APRV verification before being processed)
174	// Related actions are stored with ack_token pointing to the main action
175	// They will be processed AFTER the main action (APRV) is verified
176	if let Some(related_tokens) = inbox.related {
177		for related_token in related_tokens {
178			let related_id = hash("a", related_token.as_bytes());
179			debug!(
180				"Storing related action {} (waiting for {} verification)",
181				related_id, action_id
182			);
183
184			// Store the related action token with ack_token linking to the APRV
185			// Status 'W' = waiting for APRV verification
186			// The APRV on_receive hook will process these after verifying the APRV
187			if let Err(e) = app
188				.meta_adapter
189				.create_inbound_action(tn_id, &related_id, &related_token, Some(&action_id))
190				.await
191			{
192				// Ignore duplicate errors - action may already exist
193				debug!("Related action {} storage: {} (may be duplicate)", related_id, e);
194			}
195		}
196	}
197
198	// Process main action (APRV) - its on_receive hook will trigger related action processing
199	let task = ActionVerifierTask::new(tn_id, inbox.token.into(), client_address.clone());
200	let _task_id = app.scheduler.task(task).now().await?;
201
202	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
203
204	Ok((StatusCode::CREATED, Json(response)))
205}
206
207/// POST /api/inbox/sync - Synchronously process incoming action (e.g., IDP:REG)
208///
209/// This endpoint processes certain action types synchronously and returns the hook's response.
210/// Used for action types like IDP:REG that need immediate feedback.
211/// Uses token-based authentication like /inbox but processes synchronously and returns the hook result.
212#[axum::debug_handler]
213pub async fn post_inbox_sync(
214	State(app): State<App>,
215	tn_id: TnId,
216	IdTag(_id_tag): IdTag,
217	ConnectInfo(socket_addr): ConnectInfo<std::net::SocketAddr>,
218	OptionalRequestId(req_id): OptionalRequestId,
219	Json(inbox): Json<Inbox>,
220) -> ClResult<(StatusCode, Json<ApiResponse<serde_json::Value>>)> {
221	use crate::process::process_inbound_action_token;
222
223	debug!("POST /api/inbox/sync - Processing synchronous action");
224
225	// Create action ID from token hash
226	let action_id_box = hash("a", inbox.token.as_bytes());
227	let action_id = action_id_box.to_string();
228
229	// Extract client IP address for hooks that need it (e.g., IDP:REG with "auto" address)
230	let client_address = Some(socket_addr.ip().to_string());
231
232	// Process the action synchronously and get the hook result
233	let hook_result =
234		process_inbound_action_token(&app, tn_id, &action_id, &inbox.token, true, client_address)
235			.await
236			.map_err(|e| {
237				warn!(error = %e, "Failed to process synchronous action");
238				e
239			})?;
240
241	// Extract the return value from the hook result (or empty object if no return value)
242	let response_data = hook_result.unwrap_or(serde_json::json!({}));
243
244	debug!("POST /api/inbox/sync - Synchronous action {} processed successfully", action_id);
245
246	let response = ApiResponse::new(response_data).with_req_id(req_id.unwrap_or_default());
247
248	Ok((StatusCode::CREATED, Json(response)))
249}
250
251/// GET /api/actions/:action_id - Get a single action
252pub async fn get_action_by_id(
253	State(app): State<App>,
254	_tn_id: TnId,
255	Path(action_id): Path<String>,
256	OptionalRequestId(req_id): OptionalRequestId,
257) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
258	let action = app.meta_adapter.get_action(_tn_id, &action_id).await?;
259
260	match action {
261		Some(a) => {
262			let response = ApiResponse::new(a).with_req_id(req_id.unwrap_or_default());
263			Ok((StatusCode::OK, Json(response)))
264		}
265		None => Err(Error::NotFound),
266	}
267}
268
269/// DELETE /api/actions/:action_id - Delete action
270pub async fn delete_action(
271	State(app): State<App>,
272	_tn_id: TnId,
273	Path(action_id): Path<String>,
274	OptionalRequestId(req_id): OptionalRequestId,
275) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
276	app.meta_adapter.delete_action(_tn_id, &action_id).await?;
277	info!("Deleted action {}", action_id);
278
279	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
280
281	Ok((StatusCode::NO_CONTENT, Json(response)))
282}
283
284/// POST /api/actions/:action_id/accept - Accept an action
285pub async fn post_action_accept(
286	State(app): State<App>,
287	tn_id: TnId,
288	Auth(auth): Auth,
289	IdTag(id_tag): IdTag,
290	Path(action_id): Path<String>,
291	OptionalRequestId(req_id): OptionalRequestId,
292) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
293	info!("User {} accepting action {}", auth.id_tag, action_id);
294
295	// Fetch the action from database
296	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
297
298	// Execute DSL on_accept hook if action type has one
299	let dsl = app.ext::<Arc<DslEngine>>()?;
300	if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
301		use crate::hooks::{HookContext, HookType};
302
303		let hook_context = HookContext::builder()
304			.action_id(&*action.action_id)
305			.action_type(&*action.typ)
306			.subtype(action.sub_typ.clone().map(|s| s.to_string()))
307			.issuer(&*action.issuer.id_tag)
308			.audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
309			.parent(action.parent_id.clone().map(|s| s.to_string()))
310			.subject(action.subject.clone().map(|s| s.to_string()))
311			.content(action.content.clone())
312			.attachments(
313				action
314					.attachments
315					.clone()
316					.map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
317			)
318			.created_at(format!("{}", action.created_at.0))
319			.expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
320			.tenant(tn_id.0 as i64, &*id_tag, "person")
321			.inbound()
322			.build();
323
324		if let Err(e) =
325			dsl.execute_hook(&app, &resolved_type, HookType::OnAccept, hook_context).await
326		{
327			warn!(
328				action_id = %action_id,
329				action_type = %action.typ,
330				user = %auth.id_tag,
331				tenant_id = %tn_id.0,
332				error = %e,
333				"DSL on_accept hook failed"
334			);
335			// Don't fail the request if hook fails - log and continue
336		}
337	}
338
339	// Update action status to 'A' (Accepted)
340	let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
341		status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
342		..Default::default()
343	};
344	app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
345
346	// If action type is approvable, create APRV action to signal approval to the issuer
347	let is_approvable = dsl
348		.get_definition(&action.typ)
349		.map(|d| d.behavior.approvable.unwrap_or(false))
350		.unwrap_or(false);
351
352	if is_approvable {
353		// Create APRV action with:
354		// - audience = action.issuer_tag (original sender receives the approval)
355		// - subject = action_id (the action being approved)
356		// - visibility = 'F' so APRV broadcasts to our followers
357		let aprv_action = CreateAction {
358			typ: "APRV".into(),
359			audience_tag: Some(action.issuer.id_tag.clone()),
360			subject: Some(action_id.clone().into()),
361			visibility: Some('F'),
362			..Default::default()
363		};
364
365		match task::create_action(&app, tn_id, &id_tag, aprv_action).await {
366			Ok(_) => {
367				info!(
368					action_id = %action_id,
369					issuer = %action.issuer.id_tag,
370					"APRV action created for accepted action"
371				);
372			}
373			Err(e) => {
374				warn!(
375					action_id = %action_id,
376					error = %e,
377					"Failed to create APRV action for accepted action"
378				);
379				// Don't fail the accept request if APRV creation fails
380			}
381		}
382	}
383
384	info!(
385		action_id = %action_id,
386		action_type = %action.typ,
387		user = %auth.id_tag,
388		"Action accepted"
389	);
390
391	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
392
393	Ok((StatusCode::OK, Json(response)))
394}
395
396/// POST /api/actions/:action_id/reject - Reject an action
397pub async fn post_action_reject(
398	State(app): State<App>,
399	tn_id: TnId,
400	Auth(auth): Auth,
401	IdTag(id_tag): IdTag,
402	Path(action_id): Path<String>,
403	OptionalRequestId(req_id): OptionalRequestId,
404) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
405	info!("User {} rejecting action {}", auth.id_tag, action_id);
406
407	// Fetch the action from database
408	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
409
410	// Execute DSL on_reject hook if action type has one
411	let dsl = app.ext::<Arc<DslEngine>>()?;
412	if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
413		use crate::hooks::{HookContext, HookType};
414
415		let hook_context = HookContext::builder()
416			.action_id(&*action.action_id)
417			.action_type(&*action.typ)
418			.subtype(action.sub_typ.clone().map(|s| s.to_string()))
419			.issuer(&*action.issuer.id_tag)
420			.audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
421			.parent(action.parent_id.clone().map(|s| s.to_string()))
422			.subject(action.subject.clone().map(|s| s.to_string()))
423			.content(action.content.clone())
424			.attachments(
425				action
426					.attachments
427					.clone()
428					.map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
429			)
430			.created_at(format!("{}", action.created_at.0))
431			.expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
432			.tenant(tn_id.0 as i64, &*id_tag, "person")
433			.inbound()
434			.build();
435
436		if let Err(e) =
437			dsl.execute_hook(&app, &resolved_type, HookType::OnReject, hook_context).await
438		{
439			warn!(
440				action_id = %action_id,
441				action_type = %action.typ,
442				user = %auth.id_tag,
443				tenant_id = %tn_id.0,
444				error = %e,
445				"DSL on_reject hook failed"
446			);
447			// Don't fail the request if hook fails - log and continue
448		}
449	}
450
451	// Update action status to 'D' (Deleted)
452	let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
453		status: cloudillo_types::types::Patch::Value(crate::status::DELETED),
454		..Default::default()
455	};
456	app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
457
458	info!(
459		action_id = %action_id,
460		action_type = %action.typ,
461		user = %auth.id_tag,
462		"Action rejected"
463	);
464
465	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
466
467	Ok((StatusCode::OK, Json(response)))
468}
469
470/// POST /api/actions/:action_id/dismiss - Dismiss a notification
471pub async fn post_action_dismiss(
472	State(app): State<App>,
473	tn_id: TnId,
474	Auth(auth): Auth,
475	Path(action_id): Path<String>,
476	OptionalRequestId(req_id): OptionalRequestId,
477) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
478	let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
479
480	match action.status.as_deref().unwrap_or("") {
481		"N" => {
482			let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
483				status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
484				..Default::default()
485			};
486			app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
487		}
488		"C" => {
489			return Err(Error::ValidationError(
490				"Cannot dismiss confirmation actions. Use accept or reject.".into(),
491			));
492		}
493		_ => { /* Already 'A' or 'D' — idempotent no-op */ }
494	}
495
496	info!(
497		action_id = %action_id,
498		user = %auth.id_tag,
499		"Action dismissed"
500	);
501
502	Ok((StatusCode::OK, Json(ApiResponse::new(()).with_req_id(req_id.unwrap_or_default()))))
503}
504
505/// POST /api/actions/:action_id/stat - Update action statistics
506#[derive(Default, Deserialize)]
507pub struct UpdateActionStatRequest {
508	#[serde(default, rename = "commentsRead")]
509	pub comments_read: cloudillo_types::types::Patch<u32>,
510}
511
512pub async fn post_action_stat(
513	State(app): State<App>,
514	tn_id: TnId,
515	Auth(auth): Auth,
516	Path(action_id): Path<String>,
517	OptionalRequestId(req_id): OptionalRequestId,
518	Json(req): Json<UpdateActionStatRequest>,
519) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
520	// Update action statistics
521	let opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
522		comments_read: req.comments_read,
523		..Default::default()
524	};
525
526	app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
527
528	info!("User {} updated stats for action {}", auth.id_tag, action_id);
529
530	let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
531
532	Ok((StatusCode::OK, Json(response)))
533}
534
535/// POST /api/actions/:action_id/reaction - Add reaction to action
536pub async fn post_action_reaction(
537	State(app): State<App>,
538	tn_id: TnId,
539	IdTag(reactor_id_tag): IdTag,
540	Path(action_id): Path<String>,
541	OptionalRequestId(req_id): OptionalRequestId,
542	Json(reaction): Json<types::ReactionRequest>,
543) -> ClResult<(StatusCode, Json<ApiResponse<types::ReactionResponse>>)> {
544	// Verify action exists
545	let _action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
546
547	// Add reaction
548	app.meta_adapter
549		.add_reaction(
550			tn_id,
551			&action_id,
552			&reactor_id_tag,
553			&reaction.r#type,
554			reaction.content.as_deref(),
555		)
556		.await?;
557
558	// Generate reaction ID (simple hash)
559	let reaction_id =
560		hash("r", format!("{}:{}:{}", action_id, reactor_id_tag, reaction.r#type).as_bytes());
561
562	let reaction_response = types::ReactionResponse {
563		id: reaction_id.to_string(),
564		action_id,
565		reactor_id_tag: reactor_id_tag.into(),
566		r#type: reaction.r#type,
567		content: reaction.content,
568		created_at: cloudillo_types::types::Timestamp::now(),
569	};
570
571	let response = ApiResponse::new(reaction_response).with_req_id(req_id.unwrap_or_default());
572
573	Ok((StatusCode::CREATED, Json(response)))
574}
575
576// vim: ts=4