use axum::http::StatusCode;
use axum::routing::post;
use axum::{Json, Router};
use cano::prelude::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum TextPipelineState {
Parse,
Transform,
Done,
}
#[derive(Deserialize)]
struct ProcessRequest {
text: String,
}
#[derive(Serialize)]
struct ProcessResponse {
original: String,
word_count: usize,
uppercased: String,
}
struct RequestParams {
text: String,
}
impl Resource for RequestParams {}
#[derive(Clone)]
struct ParseTask;
#[task(state = TextPipelineState)]
impl ParseTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<TextPipelineState>, CanoError> {
let store = res.get::<MemoryStore, str>("store")?;
let params = res.get::<RequestParams, str>("request")?;
if params.text.trim().is_empty() {
return Err(CanoError::task_execution("input text is empty"));
}
store
.put("validated_text", params.text.clone())
.map_err(|e| CanoError::store(format!("{e}")))?;
Ok(TaskResult::Single(TextPipelineState::Transform))
}
}
#[derive(Clone)]
struct TransformTask;
#[task(state = TextPipelineState)]
impl TransformTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<TextPipelineState>, CanoError> {
let store = res.get::<MemoryStore, str>("store")?;
let text: String = store
.get("validated_text")
.map_err(|e| CanoError::task_execution(format!("missing validated_text: {e}")))?;
let word_count = text.split_whitespace().count();
let uppercased = text.to_uppercase();
store
.put("word_count", word_count)
.map_err(|e| CanoError::store(format!("{e}")))?;
store
.put("uppercased", uppercased)
.map_err(|e| CanoError::store(format!("{e}")))?;
Ok(TaskResult::Single(TextPipelineState::Done))
}
}
fn build_workflow(resources: Resources) -> Workflow<TextPipelineState> {
Workflow::new(resources)
.register(TextPipelineState::Parse, ParseTask)
.register(TextPipelineState::Transform, TransformTask)
.add_exit_state(TextPipelineState::Done)
.with_timeout(Duration::from_secs(5))
}
async fn process_handler(
Json(payload): Json<ProcessRequest>,
) -> Result<Json<ProcessResponse>, StatusCode> {
let store = MemoryStore::new();
let resources = Resources::new().insert("store", store.clone()).insert(
"request",
RequestParams {
text: payload.text.clone(),
},
);
let workflow = build_workflow(resources);
workflow
.orchestrate(TextPipelineState::Parse)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let word_count: usize = store
.get("word_count")
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let uppercased: String = store
.get("uppercased")
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(ProcessResponse {
original: payload.text,
word_count,
uppercased,
}))
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/process", post(process_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:4001")
.await
.expect("failed to bind to port 4001");
println!("Listening on http://localhost:4001");
println!();
println!("Try:");
println!(
r#" curl -X POST http://localhost:4001/process -H "Content-Type: application/json" -d '{{"text": "Hello World From Cano"}}'"#
);
axum::serve(listener, app).await.expect("server error");
}