cronus_generator 0.6.2

The generators for cronus API spec.
Documentation
use crate::{utils::{get_path_from_optional_parent, get_request_name, get_usecase_name}, Ctxt, Generator};
use anyhow::{ bail, Result};
use convert_case::Casing;
use cronus_spec::{PythonFastApiGeneratorOption, PythonRedisGeneratorOption};
use serde::ser;




pub struct PythonRedisGenerator {
    
}

impl PythonRedisGenerator {
    pub fn new() -> Self {
        Self {}
    }
}

impl PythonRedisGenerator {
    fn get_gen_option<'a>(&self, ctx: &'a Ctxt) -> Option<&'a PythonRedisGeneratorOption> {
        ctx.spec.option.as_ref().and_then(|go| {
            go.generator
                .as_ref()
                .and_then(|gen| gen.python_redis.as_ref())
        })
    }

    fn dst(&self, ctx: &Ctxt) -> String {
        let default_file = "generated.py";

        self.get_gen_option(ctx)
            .and_then(|gen| {
            Some(get_path_from_optional_parent(
                gen.def_loc.file.parent(),
                gen.file.as_ref(),
                default_file,
            ))
            })
            .unwrap_or_else(|| default_file.into())
    }

    
}


fn async_sender_str(service_name:&str, methods: &Vec<(&String, &cronus_spec::RawUsecaseMethod, &cronus_spec::RawUsecaseMethodRedisOption)>) -> String {
    let mut method_strs = String::new();

    for (method_name, method, option) in methods {
        
        let queue_name = option.queue_name.clone().unwrap_or_else(|| {
            let snaked_name = service_name.to_case(convert_case::Case::Snake);
            let method_name = method_name.to_case(convert_case::Case::Snake);
            format!("{snaked_name}_{method_name}")
        });
        let method_str = format!(
            r#"
    async def {method_name}(self, request):
        task_data = json.dumps(asdict(request))
        await self._redis.rpush("{queue_name}", task_data)
        "#);
        method_strs.push_str(&method_str);
    }

    format!(
        r#"
class {service_name}RedisSender({service_name}):
    def __init__(self, redis: Redis):
        self._redis = redis
    
    {method_strs}
"#
    )
}

fn async_receiver_str(ctx: &crate::Ctxt, service_name:&str, methods: &Vec<(&String, &cronus_spec::RawUsecaseMethod, &cronus_spec::RawUsecaseMethodRedisOption)>) -> String {
    let mut method_strs = String::new();
    let mut listen_methods = vec![];
    for (method_name, method, option) in methods {
        let method_name = method_name.to_case(convert_case::Case::Snake);
        let queue_name = option.queue_name.clone().unwrap_or_else(|| {
            let snaked_name = service_name.to_case(convert_case::Case::Snake);
            format!("{snaked_name}_{method_name}")
        });
        let ack_queue_name = option.ack_queue_name.clone().unwrap_or_else(|| {
            format!("{queue_name}_ack")
        });
        let listen_method_name = format!("_listen_{method_name}");
        
        let request_ty = get_request_name(ctx, &method_name);

        listen_methods.push(listen_method_name.clone());
        let method_str = format!(
            r#"
    async def _listen_{method_name}(self, request):
        while True:
            try:
                task_item = await self._redis.brpoplpush(
                    "{queue_name}",
                    "{ack_queue_name}",
                )
                if task_item:
                    task_data = json.loads(task_item)
                    request = {request_ty}(**task_data)
                    await self._service.{method_name}(request)
                    await self._redis.lrem(
                        "{ack_queue_name}", 1, task_item
                    )
            except Exception as e:
                logger.error(f"Error processing send_reset_password task: {{e}}")
                await asyncio.sleep(5)
"#);
        method_strs.push_str(&method_str);
    }
    
    let create_task_stmts = listen_methods.iter().map(|method| {
        format!("self.{method}()")
    }).collect::<Vec<_>>().join(",\n");
    let start_method_str = format!(
        r#"
    def start(self):
        tasks = [{create_task_stmts}]
        for task in tasks:
            self._tasks.append(asyncio.create_task(task))
        "#);

    
    format!(
        r#"
class {service_name}RedisReceiver({service_name}):
    def __init__(self, redis: Redis, service: {service_name}):
        self._redis = redis
        self._service = service
        self._tasks = []

    {start_method_str}
    
    {method_strs}
    "#
    )
}

impl Generator for PythonRedisGenerator {
    fn name(&self) -> &'static str {
        return "python_redis"
    }


    
    fn before_all(&self, ctx: &crate::Ctxt) -> Result<()> {
        let gen_opt = self.get_gen_option(ctx);

        
        let mut common_imports = vec![
            "import json",
            "from dataclasses import asdict",
            "import logging",
        ];

        let async_flag = gen_opt
            .and_then(|gen_opt| gen_opt.async_flag)
            .unwrap_or(false);
        let redis_import = if async_flag {
            "from redis.asyncio import Redis"
        } else {
            "from redis import Redis"
        };
        if async_flag {
            common_imports.push("import asyncio");
        }
        common_imports.push(redis_import);
        common_imports.push("logger = logging.getLogger(__name__)");

        let common_imports_str = common_imports.join("\n") + "\n";
        ctx.append_file(self.name(), &self.dst(ctx), &common_imports_str);
        Ok(())
    
    }
    
    
    fn generate_usecase(&self, ctx: &crate::Ctxt, usecase_name: &str, usecase: &cronus_spec::RawUsecase) -> Result<()> {
        let usecase_from = self.get_gen_option(ctx)
            .and_then(|gen_opt| gen_opt.usecase_from.as_ref())
            .ok_or(anyhow::anyhow!("usecase_from option is not set"))?;

        let redis_methods: Vec<(&String, &cronus_spec::RawUsecaseMethod, &cronus_spec::RawUsecaseMethodRedisOption)> = usecase.methods.iter().filter_map(|(method_name, method)| {
            let redis_option = method
                .option
                .as_ref()
                .and_then(|opt| opt.redis.as_ref());
            if redis_option.is_some() {
                Some((method_name, method, redis_option.unwrap()))
            } else {
                None
            }
            
        }).collect::<Vec<_>>();

        if redis_methods.is_empty() {
            return Ok(());
        }

        let service_name = get_usecase_name(ctx, usecase_name);

        let  sender_str = async_sender_str(&service_name, &redis_methods);
        let  receiver_str = async_receiver_str(&ctx, &service_name, &redis_methods);
        let mut types_import_from_interfaces = vec![service_name.clone()];
        for (method_name, method, option) in &redis_methods {
            if method.req.is_some() {
                let request_ty = get_request_name(ctx, method_name);
                types_import_from_interfaces.push(request_ty);
            }
        }
        let import_items = types_import_from_interfaces.join(", ");
        let import_str = format!("from {usecase_from} import {import_items}");
        
        ctx.append_file(
            self.name(),
            &self.dst(ctx),
            &import_str,
        );

        ctx.append_file(
            self.name(),
            &self.dst(ctx),
            &receiver_str,
        );
        ctx.append_file(
            self.name(),
            &self.dst(ctx),
            &sender_str,
        );

        Ok(())
    }
}