1use axum::{
2 extract::{ConnectInfo, Path, Query, State},
3 http::StatusCode,
4 Json,
5};
6use serde::{Deserialize, Serialize};
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use cloudillo_types::utils::decode_jwt_no_verify;
11
12use cloudillo_core::{
13 extract::{Auth, OptionalAuth, OptionalRequestId},
14 rate_limit::RateLimitApi,
15 IdTag,
16};
17use cloudillo_types::auth_adapter::ActionToken;
18use cloudillo_types::hasher::hash;
19use cloudillo_types::meta_adapter;
20use cloudillo_types::types::{self, ApiResponse};
21
22use crate::{
23 dsl::DslEngine,
24 filter::filter_actions_by_visibility,
25 prelude::*,
26 task::{self, ActionVerifierTask, CreateAction},
27};
28
29pub async fn list_actions(
30 State(app): State<App>,
31 tn_id: TnId,
32 IdTag(tenant_id_tag): IdTag,
33 OptionalAuth(maybe_auth): OptionalAuth,
34 OptionalRequestId(req_id): OptionalRequestId,
35 Query(mut opts): Query<meta_adapter::ListActionOptions>,
36) -> ClResult<(StatusCode, Json<ApiResponse<Vec<meta_adapter::ActionView>>>)> {
37 let (subject_id_tag, is_authenticated) = match &maybe_auth {
39 Some(auth) => (auth.id_tag.as_ref(), true),
40 None => ("", false),
41 };
42
43 if is_authenticated {
45 opts.viewer_id_tag = Some(subject_id_tag.to_string());
46 }
47
48 let limit = opts.limit.unwrap_or(20) as usize;
49 let sort_field = opts.sort.as_deref().unwrap_or("created");
50
51 let actions = app.meta_adapter.list_actions(tn_id, &opts).await?;
52
53 let mut filtered = filter_actions_by_visibility(
54 &app,
55 tn_id,
56 subject_id_tag,
57 is_authenticated,
58 &tenant_id_tag,
59 actions,
60 )
61 .await?;
62
63 let has_more = filtered.len() > limit;
65 if has_more {
66 filtered.truncate(limit);
67 }
68
69 let next_cursor = if has_more && !filtered.is_empty() {
71 let last = filtered.last().ok_or(Error::Internal("no last item".into()))?;
72 let sort_value = serde_json::Value::Number(last.created_at.0.into());
73 let cursor = types::CursorData::new(sort_field, sort_value, &last.action_id);
74 Some(cursor.encode())
75 } else {
76 None
77 };
78
79 let response = ApiResponse::with_cursor_pagination(filtered, next_cursor, has_more)
80 .with_req_id(req_id.unwrap_or_default());
81
82 Ok((StatusCode::OK, Json(response)))
83}
84
85#[axum::debug_handler]
86pub async fn post_action(
87 State(app): State<App>,
88 tn_id: TnId,
89 IdTag(id_tag): IdTag,
90 OptionalRequestId(req_id): OptionalRequestId,
91 Json(action): Json<CreateAction>,
92) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
93 let action_id = task::create_action(&app, tn_id, &id_tag, action).await?;
94 debug!("actionId {:?}", &action_id);
95
96 let list = app
97 .meta_adapter
98 .list_actions(
99 tn_id,
100 &meta_adapter::ListActionOptions {
101 action_id: Some(action_id.to_string()),
102 ..Default::default()
103 },
104 )
105 .await?;
106 if list.len() != 1 {
107 return Err(Error::NotFound);
108 }
109
110 let mut response = ApiResponse::new(list[0].clone());
111 if let Some(id) = req_id {
112 response = response.with_req_id(id);
113 }
114
115 Ok((StatusCode::CREATED, Json(response)))
116}
117
118#[derive(Deserialize)]
119pub struct Inbox {
120 token: String,
121 related: Option<Vec<String>>,
122}
123
124#[derive(Serialize, Deserialize)]
126pub struct SyncActionRequest {
127 pub r#type: String,
129 pub subtype: Option<String>,
131 pub issuer: String,
133 pub audience: Option<String>,
135 pub content: serde_json::Value,
137 pub parent: Option<String>,
139 pub subject: Option<String>,
141 pub attachments: Option<Vec<String>>,
143}
144
145#[axum::debug_handler]
146pub async fn post_inbox(
147 State(app): State<App>,
148 tn_id: TnId,
149 ConnectInfo(addr): ConnectInfo<SocketAddr>,
150 OptionalRequestId(req_id): OptionalRequestId,
151 Json(inbox): Json<Inbox>,
152) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
153 if let Ok(action_preview) = decode_jwt_no_verify::<ActionToken>(&inbox.token) {
156 if action_preview.t.starts_with("CONN") {
157 if let Err(pow_err) = app.rate_limiter.verify_pow(&addr.ip(), &inbox.token) {
159 debug!("CONN action from {} requires PoW: {:?}", action_preview.iss, pow_err);
160 return Err(Error::PreconditionRequired(format!(
161 "Proof of work required: {}",
162 pow_err
163 )));
164 }
165 }
166 }
167
168 let action_id = hash("a", inbox.token.as_bytes());
169
170 let client_address: Option<Box<str>> = Some(addr.ip().to_string().into());
172
173 if let Some(related_tokens) = inbox.related {
177 for related_token in related_tokens {
178 let related_id = hash("a", related_token.as_bytes());
179 debug!(
180 "Storing related action {} (waiting for {} verification)",
181 related_id, action_id
182 );
183
184 if let Err(e) = app
188 .meta_adapter
189 .create_inbound_action(tn_id, &related_id, &related_token, Some(&action_id))
190 .await
191 {
192 debug!("Related action {} storage: {} (may be duplicate)", related_id, e);
194 }
195 }
196 }
197
198 let task = ActionVerifierTask::new(tn_id, inbox.token.into(), client_address.clone());
200 let _task_id = app.scheduler.task(task).now().await?;
201
202 let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
203
204 Ok((StatusCode::CREATED, Json(response)))
205}
206
207#[axum::debug_handler]
213pub async fn post_inbox_sync(
214 State(app): State<App>,
215 tn_id: TnId,
216 IdTag(_id_tag): IdTag,
217 ConnectInfo(socket_addr): ConnectInfo<std::net::SocketAddr>,
218 OptionalRequestId(req_id): OptionalRequestId,
219 Json(inbox): Json<Inbox>,
220) -> ClResult<(StatusCode, Json<ApiResponse<serde_json::Value>>)> {
221 use crate::process::process_inbound_action_token;
222
223 debug!("POST /api/inbox/sync - Processing synchronous action");
224
225 let action_id_box = hash("a", inbox.token.as_bytes());
227 let action_id = action_id_box.to_string();
228
229 let client_address = Some(socket_addr.ip().to_string());
231
232 let hook_result =
234 process_inbound_action_token(&app, tn_id, &action_id, &inbox.token, true, client_address)
235 .await
236 .map_err(|e| {
237 warn!(error = %e, "Failed to process synchronous action");
238 e
239 })?;
240
241 let response_data = hook_result.unwrap_or(serde_json::json!({}));
243
244 debug!("POST /api/inbox/sync - Synchronous action {} processed successfully", action_id);
245
246 let response = ApiResponse::new(response_data).with_req_id(req_id.unwrap_or_default());
247
248 Ok((StatusCode::CREATED, Json(response)))
249}
250
251pub async fn get_action_by_id(
253 State(app): State<App>,
254 _tn_id: TnId,
255 Path(action_id): Path<String>,
256 OptionalRequestId(req_id): OptionalRequestId,
257) -> ClResult<(StatusCode, Json<ApiResponse<meta_adapter::ActionView>>)> {
258 let action = app.meta_adapter.get_action(_tn_id, &action_id).await?;
259
260 match action {
261 Some(a) => {
262 let response = ApiResponse::new(a).with_req_id(req_id.unwrap_or_default());
263 Ok((StatusCode::OK, Json(response)))
264 }
265 None => Err(Error::NotFound),
266 }
267}
268
269pub async fn delete_action(
271 State(app): State<App>,
272 _tn_id: TnId,
273 Path(action_id): Path<String>,
274 OptionalRequestId(req_id): OptionalRequestId,
275) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
276 app.meta_adapter.delete_action(_tn_id, &action_id).await?;
277 info!("Deleted action {}", action_id);
278
279 let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
280
281 Ok((StatusCode::NO_CONTENT, Json(response)))
282}
283
284pub async fn post_action_accept(
286 State(app): State<App>,
287 tn_id: TnId,
288 Auth(auth): Auth,
289 IdTag(id_tag): IdTag,
290 Path(action_id): Path<String>,
291 OptionalRequestId(req_id): OptionalRequestId,
292) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
293 info!("User {} accepting action {}", auth.id_tag, action_id);
294
295 let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
297
298 let dsl = app.ext::<Arc<DslEngine>>()?;
300 if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
301 use crate::hooks::{HookContext, HookType};
302
303 let hook_context = HookContext::builder()
304 .action_id(&*action.action_id)
305 .action_type(&*action.typ)
306 .subtype(action.sub_typ.clone().map(|s| s.to_string()))
307 .issuer(&*action.issuer.id_tag)
308 .audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
309 .parent(action.parent_id.clone().map(|s| s.to_string()))
310 .subject(action.subject.clone().map(|s| s.to_string()))
311 .content(action.content.clone())
312 .attachments(
313 action
314 .attachments
315 .clone()
316 .map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
317 )
318 .created_at(format!("{}", action.created_at.0))
319 .expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
320 .tenant(tn_id.0 as i64, &*id_tag, "person")
321 .inbound()
322 .build();
323
324 if let Err(e) =
325 dsl.execute_hook(&app, &resolved_type, HookType::OnAccept, hook_context).await
326 {
327 warn!(
328 action_id = %action_id,
329 action_type = %action.typ,
330 user = %auth.id_tag,
331 tenant_id = %tn_id.0,
332 error = %e,
333 "DSL on_accept hook failed"
334 );
335 }
337 }
338
339 let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
341 status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
342 ..Default::default()
343 };
344 app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
345
346 let is_approvable = dsl
348 .get_definition(&action.typ)
349 .map(|d| d.behavior.approvable.unwrap_or(false))
350 .unwrap_or(false);
351
352 if is_approvable {
353 let aprv_action = CreateAction {
358 typ: "APRV".into(),
359 audience_tag: Some(action.issuer.id_tag.clone()),
360 subject: Some(action_id.clone().into()),
361 visibility: Some('F'),
362 ..Default::default()
363 };
364
365 match task::create_action(&app, tn_id, &id_tag, aprv_action).await {
366 Ok(_) => {
367 info!(
368 action_id = %action_id,
369 issuer = %action.issuer.id_tag,
370 "APRV action created for accepted action"
371 );
372 }
373 Err(e) => {
374 warn!(
375 action_id = %action_id,
376 error = %e,
377 "Failed to create APRV action for accepted action"
378 );
379 }
381 }
382 }
383
384 info!(
385 action_id = %action_id,
386 action_type = %action.typ,
387 user = %auth.id_tag,
388 "Action accepted"
389 );
390
391 let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
392
393 Ok((StatusCode::OK, Json(response)))
394}
395
396pub async fn post_action_reject(
398 State(app): State<App>,
399 tn_id: TnId,
400 Auth(auth): Auth,
401 IdTag(id_tag): IdTag,
402 Path(action_id): Path<String>,
403 OptionalRequestId(req_id): OptionalRequestId,
404) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
405 info!("User {} rejecting action {}", auth.id_tag, action_id);
406
407 let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
409
410 let dsl = app.ext::<Arc<DslEngine>>()?;
412 if let Some(resolved_type) = dsl.resolve_action_type(&action.typ, action.sub_typ.as_deref()) {
413 use crate::hooks::{HookContext, HookType};
414
415 let hook_context = HookContext::builder()
416 .action_id(&*action.action_id)
417 .action_type(&*action.typ)
418 .subtype(action.sub_typ.clone().map(|s| s.to_string()))
419 .issuer(&*action.issuer.id_tag)
420 .audience(action.audience.as_ref().map(|a| a.id_tag.to_string()))
421 .parent(action.parent_id.clone().map(|s| s.to_string()))
422 .subject(action.subject.clone().map(|s| s.to_string()))
423 .content(action.content.clone())
424 .attachments(
425 action
426 .attachments
427 .clone()
428 .map(|v| v.iter().map(|a| a.file_id.to_string()).collect()),
429 )
430 .created_at(format!("{}", action.created_at.0))
431 .expires_at(action.expires_at.map(|ts| format!("{}", ts.0)))
432 .tenant(tn_id.0 as i64, &*id_tag, "person")
433 .inbound()
434 .build();
435
436 if let Err(e) =
437 dsl.execute_hook(&app, &resolved_type, HookType::OnReject, hook_context).await
438 {
439 warn!(
440 action_id = %action_id,
441 action_type = %action.typ,
442 user = %auth.id_tag,
443 tenant_id = %tn_id.0,
444 error = %e,
445 "DSL on_reject hook failed"
446 );
447 }
449 }
450
451 let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
453 status: cloudillo_types::types::Patch::Value(crate::status::DELETED),
454 ..Default::default()
455 };
456 app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
457
458 info!(
459 action_id = %action_id,
460 action_type = %action.typ,
461 user = %auth.id_tag,
462 "Action rejected"
463 );
464
465 let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
466
467 Ok((StatusCode::OK, Json(response)))
468}
469
470pub async fn post_action_dismiss(
472 State(app): State<App>,
473 tn_id: TnId,
474 Auth(auth): Auth,
475 Path(action_id): Path<String>,
476 OptionalRequestId(req_id): OptionalRequestId,
477) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
478 let action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
479
480 match action.status.as_deref().unwrap_or("") {
481 "N" => {
482 let update_opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
483 status: cloudillo_types::types::Patch::Value(crate::status::ACTIVE),
484 ..Default::default()
485 };
486 app.meta_adapter.update_action_data(tn_id, &action_id, &update_opts).await?;
487 }
488 "C" => {
489 return Err(Error::ValidationError(
490 "Cannot dismiss confirmation actions. Use accept or reject.".into(),
491 ));
492 }
493 _ => { }
494 }
495
496 info!(
497 action_id = %action_id,
498 user = %auth.id_tag,
499 "Action dismissed"
500 );
501
502 Ok((StatusCode::OK, Json(ApiResponse::new(()).with_req_id(req_id.unwrap_or_default()))))
503}
504
505#[derive(Default, Deserialize)]
507pub struct UpdateActionStatRequest {
508 #[serde(default, rename = "commentsRead")]
509 pub comments_read: cloudillo_types::types::Patch<u32>,
510}
511
512pub async fn post_action_stat(
513 State(app): State<App>,
514 tn_id: TnId,
515 Auth(auth): Auth,
516 Path(action_id): Path<String>,
517 OptionalRequestId(req_id): OptionalRequestId,
518 Json(req): Json<UpdateActionStatRequest>,
519) -> ClResult<(StatusCode, Json<ApiResponse<()>>)> {
520 let opts = cloudillo_types::meta_adapter::UpdateActionDataOptions {
522 comments_read: req.comments_read,
523 ..Default::default()
524 };
525
526 app.meta_adapter.update_action_data(tn_id, &action_id, &opts).await?;
527
528 info!("User {} updated stats for action {}", auth.id_tag, action_id);
529
530 let response = ApiResponse::new(()).with_req_id(req_id.unwrap_or_default());
531
532 Ok((StatusCode::OK, Json(response)))
533}
534
535pub async fn post_action_reaction(
537 State(app): State<App>,
538 tn_id: TnId,
539 IdTag(reactor_id_tag): IdTag,
540 Path(action_id): Path<String>,
541 OptionalRequestId(req_id): OptionalRequestId,
542 Json(reaction): Json<types::ReactionRequest>,
543) -> ClResult<(StatusCode, Json<ApiResponse<types::ReactionResponse>>)> {
544 let _action = app.meta_adapter.get_action(tn_id, &action_id).await?.ok_or(Error::NotFound)?;
546
547 app.meta_adapter
549 .add_reaction(
550 tn_id,
551 &action_id,
552 &reactor_id_tag,
553 &reaction.r#type,
554 reaction.content.as_deref(),
555 )
556 .await?;
557
558 let reaction_id =
560 hash("r", format!("{}:{}:{}", action_id, reactor_id_tag, reaction.r#type).as_bytes());
561
562 let reaction_response = types::ReactionResponse {
563 id: reaction_id.to_string(),
564 action_id,
565 reactor_id_tag: reactor_id_tag.into(),
566 r#type: reaction.r#type,
567 content: reaction.content,
568 created_at: cloudillo_types::types::Timestamp::now(),
569 };
570
571 let response = ApiResponse::new(reaction_response).with_req_id(req_id.unwrap_or_default());
572
573 Ok((StatusCode::CREATED, Json(response)))
574}
575
576