use super::{PythonDtoStyle, SchemaRegistry};
use anyhow::{Result, bail};
use heck::{ToPascalCase, ToSnakeCase};
use openapiv3::{
OpenAPI, Operation, Parameter, ParameterData, ParameterSchemaOrContent, ReferenceOr, Schema, SchemaKind,
StringFormat, Type, VariantOrUnknownOrEmpty,
};
use std::collections::BTreeSet;
pub struct PythonGenerator {
spec: OpenAPI,
dto: PythonDtoStyle,
registry: SchemaRegistry,
}
struct PythonFieldSpec {
original_name: String,
field_name: String,
type_hint: String,
required: bool,
}
impl PythonGenerator {
#[must_use]
pub fn new(spec: OpenAPI, dto: PythonDtoStyle) -> Self {
let registry = SchemaRegistry::from_spec(&spec);
Self { spec, dto, registry }
}
pub fn generate(&self) -> Result<String> {
let mut output = String::new();
output.push_str(&self.generate_header());
output.push_str(&self.generate_models()?);
output.push_str(&self.generate_routes()?);
output.push_str(&self.generate_main());
Ok(output)
}
fn generate_header(&self) -> String {
let mut header = String::new();
header.push_str("from __future__ import annotations\n\n");
let uses_path = self.uses_path_params();
let uses_query = self.uses_query_params();
let uses_body = self.uses_request_body();
let noqa = if uses_query {
"# ruff: noqa: B008, I001, INP001\n\n"
} else {
"# ruff: noqa: I001, INP001\n\n"
};
header.push_str(noqa);
let uses_literal = self.uses_literal_types();
let uses_date = self.uses_date_types();
let uses_datetime = self.uses_datetime_types();
let uses_uuid = self.uses_uuid_types();
match self.dto {
PythonDtoStyle::Dataclass => {
header.push_str(&format!(
r"# Generated by Spikard OpenAPI code generator
# OpenAPI Version: {}
# Title: {}
# DO NOT EDIT - regenerate from OpenAPI schema
from dataclasses import dataclass
",
self.spec.openapi, self.spec.info.title,
));
}
PythonDtoStyle::Msgspec => {
header.push_str(&format!(
r"# Generated by Spikard OpenAPI code generator
# OpenAPI Version: {}
# Title: {}
# DO NOT EDIT - regenerate from OpenAPI schema
import msgspec
",
self.spec.openapi, self.spec.info.title,
));
}
}
if uses_literal {
header.push_str("from typing import Literal\n");
}
if uses_date || uses_datetime {
if uses_date && uses_datetime {
header.push_str("from datetime import date, datetime\n");
} else if uses_date {
header.push_str("from datetime import date\n");
} else {
header.push_str("from datetime import datetime\n");
}
}
if uses_uuid {
header.push_str("from uuid import UUID\n");
}
header.push_str("from spikard import ");
header.push_str(&self.spikard_imports(uses_body, uses_path, uses_query));
header.push_str("\n\napp = Spikard()\n\n");
header
}
fn generate_models(&self) -> Result<String> {
if self.dto == PythonDtoStyle::Msgspec {
return self.generate_msgspec_models();
}
self.generate_dataclass_models()
}
fn generate_dataclass_models(&self) -> Result<String> {
let mut output = String::new();
output.push_str("# Schema Models\n\n");
let mut emitted = BTreeSet::new();
if let Some(components) = &self.spec.components {
for (name, schema_ref) in &components.schemas {
match schema_ref {
ReferenceOr::Item(schema) => {
self.generate_dataclass_family(&name.to_pascal_case(), schema, &mut emitted, &mut output)?;
}
ReferenceOr::Reference { .. } => continue,
}
}
}
self.generate_inline_route_models(&mut emitted, &mut output)?;
Ok(output)
}
fn generate_msgspec_models(&self) -> Result<String> {
let mut output = String::new();
output.push_str("# Schema Models\n\n");
let mut emitted = BTreeSet::new();
if let Some(components) = &self.spec.components {
for (name, schema_ref) in &components.schemas {
match schema_ref {
ReferenceOr::Item(schema) => {
self.generate_msgspec_family(&name.to_pascal_case(), schema, &mut emitted, &mut output)?;
}
ReferenceOr::Reference { .. } => continue,
}
}
}
self.generate_inline_route_models(&mut emitted, &mut output)?;
Ok(output)
}
fn generate_inline_route_models(&self, emitted: &mut BTreeSet<String>, output: &mut String) -> Result<()> {
for path_item_ref in self.spec.paths.paths.values() {
let ReferenceOr::Item(path_item) = path_item_ref else {
continue;
};
for operation in [
path_item.get.as_ref(),
path_item.post.as_ref(),
path_item.put.as_ref(),
path_item.delete.as_ref(),
path_item.patch.as_ref(),
]
.into_iter()
.flatten()
{
if let Some((class_name, schema)) = self.inline_request_body_model(operation) {
self.generate_model_family(&class_name, schema, emitted, output)?;
}
if let Some((class_name, schema)) = self.inline_response_model(operation) {
self.generate_model_family(&class_name, schema, emitted, output)?;
}
}
}
Ok(())
}
fn generate_model_family(
&self,
class_name: &str,
schema: &Schema,
emitted: &mut BTreeSet<String>,
output: &mut String,
) -> Result<()> {
match self.dto {
PythonDtoStyle::Dataclass => self.generate_dataclass_family(class_name, schema, emitted, output),
PythonDtoStyle::Msgspec => self.generate_msgspec_family(class_name, schema, emitted, output),
}
}
fn generate_dataclass_family(
&self,
class_name: &str,
schema: &Schema,
emitted: &mut BTreeSet<String>,
output: &mut String,
) -> Result<()> {
if !emitted.insert(class_name.to_string()) {
return Ok(());
}
self.generate_nested_families(class_name, schema, emitted, output)?;
output.push_str(&self.generate_dataclass(class_name, schema)?);
output.push('\n');
Ok(())
}
fn generate_msgspec_family(
&self,
class_name: &str,
schema: &Schema,
emitted: &mut BTreeSet<String>,
output: &mut String,
) -> Result<()> {
if !emitted.insert(class_name.to_string()) {
return Ok(());
}
self.generate_nested_families(class_name, schema, emitted, output)?;
output.push_str(&self.generate_msgspec_struct(class_name, schema)?);
output.push('\n');
Ok(())
}
fn generate_nested_families(
&self,
parent_class_name: &str,
schema: &Schema,
emitted: &mut BTreeSet<String>,
output: &mut String,
) -> Result<()> {
match &schema.schema_kind {
SchemaKind::Type(Type::Object(obj)) => {
for (prop_name, prop_schema_ref) in &obj.properties {
match prop_schema_ref {
ReferenceOr::Item(prop_schema) => {
if let Some(class_name) = self.inline_model_name(parent_class_name, prop_name, prop_schema)
{
self.generate_model_family(&class_name, prop_schema, emitted, output)?;
}
if let Some(array_item_name) =
self.inline_array_item_model_name(parent_class_name, prop_name, prop_schema)
&& let Some(item_schema) = inline_array_item_schema(prop_schema)
{
self.generate_model_family(&array_item_name, item_schema, emitted, output)?;
}
}
ReferenceOr::Reference { .. } => {}
}
}
}
SchemaKind::AllOf { all_of } => {
for schema_ref in all_of {
match schema_ref {
ReferenceOr::Item(item) => {
self.generate_nested_families(parent_class_name, item, emitted, output)?
}
ReferenceOr::Reference { reference } => {
if let Some(resolved) = self.resolve_schema_reference(reference) {
self.generate_nested_families(parent_class_name, resolved, emitted, output)?;
}
}
}
}
}
_ => {}
}
Ok(())
}
fn generate_dataclass(&self, class_name: &str, schema: &Schema) -> Result<String> {
if self.dto != PythonDtoStyle::Dataclass {
bail!("dataclass generation called for non-dataclass style");
}
let mut output = String::new();
output.push_str("@dataclass(slots=True, kw_only=True)\n");
output.push_str(&format!("class {class_name}:\n"));
output.push_str(&format!(
" \"\"\"{}\"\"\"\n",
ensure_sentence(
schema
.schema_data
.description
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("Generated OpenAPI schema model.")
)
));
let mut fields = Vec::new();
self.collect_model_fields_into(class_name, schema, &mut fields);
if !fields.is_empty() {
fields.sort_by_key(|field| !field.required);
for field in fields {
if field.required {
output.push_str(&format!(" {}: {}\n", field.field_name, field.type_hint));
} else {
output.push_str(&format!(" {}: {} = None\n", field.field_name, field.type_hint));
}
}
}
Ok(output)
}
fn generate_msgspec_struct(&self, name: &str, schema: &Schema) -> Result<String> {
let class_name = name.to_pascal_case();
let mut output = String::new();
output.push_str(&format!("class {class_name}(msgspec.Struct):\n"));
output.push_str(&format!(
" \"\"\"{}\"\"\"\n",
ensure_sentence(
schema
.schema_data
.description
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("Generated OpenAPI schema model.")
)
));
let mut fields = Vec::new();
self.collect_model_fields_into(&class_name, schema, &mut fields);
if !fields.is_empty() {
fields.sort_by_key(|field| !field.required);
for field in fields {
if field.required {
output.push_str(&format!(" {}: {}\n", field.field_name, field.type_hint));
} else {
output.push_str(&format!(" {}: {} = None\n", field.field_name, field.type_hint));
}
}
}
Ok(output)
}
fn collect_model_fields_into(&self, parent_class_name: &str, schema: &Schema, fields: &mut Vec<PythonFieldSpec>) {
match &schema.schema_kind {
SchemaKind::Type(Type::Object(obj)) => {
for (prop_name, prop_schema_ref) in &obj.properties {
if fields.iter().any(|field| field.original_name == *prop_name) {
continue;
}
let is_required = obj.required.contains(prop_name);
fields.push(PythonFieldSpec {
original_name: prop_name.clone(),
field_name: prop_name.to_snake_case(),
type_hint: self.python_type_from_boxed_schema_ref(
parent_class_name,
prop_name,
prop_schema_ref,
!is_required,
),
required: is_required,
});
}
}
SchemaKind::AllOf { all_of } => {
for schema_ref in all_of {
match schema_ref {
ReferenceOr::Item(schema) => self.collect_model_fields_into(parent_class_name, schema, fields),
ReferenceOr::Reference { reference } => {
if let Some(schema) = self.resolve_schema_reference(reference) {
self.collect_model_fields_into(parent_class_name, schema, fields);
}
}
}
}
}
_ => {}
}
}
fn resolve_schema_reference<'a>(&'a self, reference: &str) -> Option<&'a Schema> {
let name = reference.split('/').next_back()?;
self.spec
.components
.as_ref()?
.schemas
.get(name)
.and_then(|schema_ref| match schema_ref {
ReferenceOr::Item(schema) => Some(schema),
ReferenceOr::Reference { .. } => None,
})
}
fn extract_type_from_schema_ref(&self, schema_ref: &ReferenceOr<Schema>, inline_name: Option<&str>) -> String {
self.python_type_from_schema_ref(None, None, schema_ref, false, inline_name)
}
fn extract_request_body_type(&self, operation: &Operation) -> Option<String> {
operation.request_body.as_ref().and_then(|body_ref| match body_ref {
ReferenceOr::Item(request_body) => request_body.content.get("application/json").and_then(|media_type| {
media_type.schema.as_ref().map(|schema_ref| {
self.extract_type_from_schema_ref(
schema_ref,
operation
.operation_id
.as_deref()
.map(|id| format!("{}RequestBody", id.to_pascal_case()))
.as_deref(),
)
})
}),
ReferenceOr::Reference { reference } => {
let ref_name = reference.split('/').next_back().unwrap();
Some(ref_name.to_pascal_case())
}
})
}
fn extract_response_type(&self, operation: &Operation) -> String {
use openapiv3::StatusCode;
let response = operation
.responses
.responses
.get(&StatusCode::Code(200))
.or_else(|| operation.responses.responses.get(&StatusCode::Code(201)))
.or_else(|| operation.responses.responses.get(&StatusCode::Range(2)));
if let Some(response_ref) = response {
match response_ref {
ReferenceOr::Item(response) => {
if let Some(content) = response.content.get("application/json")
&& let Some(schema_ref) = &content.schema
{
return self.extract_type_from_schema_ref(
schema_ref,
operation
.operation_id
.as_deref()
.map(|id| format!("{}ResponseBody", id.to_pascal_case()))
.as_deref(),
);
}
}
ReferenceOr::Reference { reference } => {
let ref_name = reference.split('/').next_back().unwrap();
return ref_name.to_pascal_case();
}
}
}
"dict[str, object]".to_string()
}
fn python_type_from_schema_ref(
&self,
parent_class_name: Option<&str>,
field_name: Option<&str>,
schema_ref: &ReferenceOr<Schema>,
optional: bool,
inline_name: Option<&str>,
) -> String {
match schema_ref {
ReferenceOr::Item(schema) => {
self.schema_to_python_type(parent_class_name, field_name, schema, optional, inline_name)
}
ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, optional),
}
}
fn python_type_from_boxed_schema_ref(
&self,
parent_class_name: &str,
field_name: &str,
schema_ref: &ReferenceOr<Box<Schema>>,
optional: bool,
) -> String {
match schema_ref {
ReferenceOr::Item(schema) => {
self.schema_to_python_type(Some(parent_class_name), Some(field_name), schema, optional, None)
}
ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, optional),
}
}
fn python_type_from_reference(&self, reference: &str, optional: bool) -> String {
let mut base = reference.split('/').next_back().unwrap().to_pascal_case();
if let Some(schema) = self.registry.resolve_reference(reference)
&& schema.schema_data.nullable
{
base = self.append_optional(base, true);
}
self.append_optional(base, optional)
}
#[allow(clippy::only_used_in_recursion)]
fn schema_to_python_type(
&self,
parent_class_name: Option<&str>,
field_name: Option<&str>,
schema: &Schema,
optional: bool,
inline_name: Option<&str>,
) -> String {
let mut base_type = if let Some(literal_type) = self.literal_type(schema) {
literal_type
} else {
match &schema.schema_kind {
SchemaKind::Type(Type::String(string_type)) => self.string_format_python_type(string_type),
SchemaKind::Type(Type::Number(_)) => "float".to_string(),
SchemaKind::Type(Type::Integer(_)) => "int".to_string(),
SchemaKind::Type(Type::Boolean(_)) => "bool".to_string(),
SchemaKind::Type(Type::Array(arr)) => {
let item_type = match &arr.items {
Some(item_schema) => self.python_array_item_type(parent_class_name, field_name, item_schema),
None => "dict[str, object]".to_string(),
};
format!("list[{item_type}]")
}
SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => inline_name
.map(ToOwned::to_owned)
.or_else(|| {
parent_class_name
.zip(field_name)
.map(|(parent, field)| format!("{parent}{}", field.to_pascal_case()))
})
.unwrap_or_else(|| "dict[str, object]".to_string()),
SchemaKind::Type(Type::Object(_)) => "dict[str, object]".to_string(),
_ => "dict[str, object]".to_string(),
}
};
if schema.schema_data.nullable {
base_type = self.append_optional(base_type, true);
}
self.append_optional(base_type, optional)
}
fn literal_type(&self, schema: &Schema) -> Option<String> {
match &schema.schema_kind {
SchemaKind::Type(Type::String(string_type)) => {
let values = string_type
.enumeration
.iter()
.flatten()
.map(|value| serde_json::to_string(value).ok())
.collect::<Option<Vec<_>>>()?;
(!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
}
SchemaKind::Type(Type::Integer(integer_type)) => {
let values = integer_type
.enumeration
.iter()
.flatten()
.map(ToString::to_string)
.collect::<Vec<_>>();
(!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
}
SchemaKind::Type(Type::Number(number_type)) => {
let values = number_type
.enumeration
.iter()
.flatten()
.map(ToString::to_string)
.collect::<Vec<_>>();
(!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
}
_ => None,
}
}
fn string_format_python_type(&self, string_type: &openapiv3::StringType) -> String {
match &string_type.format {
VariantOrUnknownOrEmpty::Item(StringFormat::Date) => "date".to_string(),
VariantOrUnknownOrEmpty::Item(StringFormat::DateTime) => "datetime".to_string(),
VariantOrUnknownOrEmpty::Item(StringFormat::Byte | StringFormat::Binary) => "bytes".to_string(),
VariantOrUnknownOrEmpty::Unknown(format) if format == "uuid" => "UUID".to_string(),
_ => "str".to_string(),
}
}
fn append_optional(&self, base: String, optional: bool) -> String {
if optional && !base.trim().ends_with("| None") {
format!("{base} | None")
} else {
base
}
}
fn python_array_item_type(
&self,
parent_class_name: Option<&str>,
field_name: Option<&str>,
schema_ref: &ReferenceOr<Box<Schema>>,
) -> String {
match schema_ref {
ReferenceOr::Item(schema) => {
let inline_item_name = parent_class_name
.zip(field_name)
.and_then(|(parent, field)| self.inline_array_item_model_name(parent, field, schema));
self.schema_to_python_type(None, None, schema, false, inline_item_name.as_deref())
}
ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, false),
}
}
fn inline_model_name(&self, parent_class_name: &str, field_name: &str, schema: &Schema) -> Option<String> {
match &schema.schema_kind {
SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => {
Some(format!("{parent_class_name}{}", field_name.to_pascal_case()))
}
_ => None,
}
}
fn inline_array_item_model_name(
&self,
parent_class_name: &str,
field_name: &str,
schema: &Schema,
) -> Option<String> {
let item_schema = inline_array_item_schema(schema)?;
match &item_schema.schema_kind {
SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => {
Some(format!("{parent_class_name}{}Item", field_name.to_pascal_case()))
}
_ => None,
}
}
fn inline_request_body_model<'a>(&self, operation: &'a Operation) -> Option<(String, &'a Schema)> {
let operation_id = operation.operation_id.as_deref()?;
let request_body = operation.request_body.as_ref()?;
match request_body {
ReferenceOr::Item(body) => {
let schema_ref = body.content.get("application/json")?.schema.as_ref()?;
match schema_ref {
ReferenceOr::Item(schema) => {
if is_named_inline_object_schema(schema) {
Some((format!("{}RequestBody", operation_id.to_pascal_case()), schema))
} else {
None
}
}
ReferenceOr::Reference { .. } => None,
}
}
ReferenceOr::Reference { .. } => None,
}
}
fn inline_response_model<'a>(&self, operation: &'a Operation) -> Option<(String, &'a Schema)> {
use openapiv3::StatusCode;
let operation_id = operation.operation_id.as_deref()?;
let response = operation
.responses
.responses
.get(&StatusCode::Code(200))
.or_else(|| operation.responses.responses.get(&StatusCode::Code(201)))
.or_else(|| operation.responses.responses.get(&StatusCode::Range(2)))?;
match response {
ReferenceOr::Item(response) => {
let schema_ref = response.content.get("application/json")?.schema.as_ref()?;
match schema_ref {
ReferenceOr::Item(schema) => {
if is_named_inline_object_schema(schema) {
Some((format!("{}ResponseBody", operation_id.to_pascal_case()), schema))
} else {
None
}
}
ReferenceOr::Reference { .. } => None,
}
}
ReferenceOr::Reference { .. } => None,
}
}
fn uses_path_params(&self) -> bool {
self.spec.paths.paths.values().any(|path_item_ref| {
let ReferenceOr::Item(path_item) = path_item_ref else {
return false;
};
path_item.get.as_ref().is_some_and(operation_has_path_params)
|| path_item.post.as_ref().is_some_and(operation_has_path_params)
|| path_item.put.as_ref().is_some_and(operation_has_path_params)
|| path_item.delete.as_ref().is_some_and(operation_has_path_params)
|| path_item.patch.as_ref().is_some_and(operation_has_path_params)
})
}
fn uses_query_params(&self) -> bool {
self.spec.paths.paths.values().any(|path_item_ref| {
let ReferenceOr::Item(path_item) = path_item_ref else {
return false;
};
path_item.get.as_ref().is_some_and(operation_has_query_params)
|| path_item.post.as_ref().is_some_and(operation_has_query_params)
|| path_item.put.as_ref().is_some_and(operation_has_query_params)
|| path_item.delete.as_ref().is_some_and(operation_has_query_params)
|| path_item.patch.as_ref().is_some_and(operation_has_query_params)
})
}
fn uses_request_body(&self) -> bool {
self.spec.paths.paths.values().any(|path_item_ref| {
let ReferenceOr::Item(path_item) = path_item_ref else {
return false;
};
path_item.get.as_ref().is_some_and(operation_has_request_body)
|| path_item.post.as_ref().is_some_and(operation_has_request_body)
|| path_item.put.as_ref().is_some_and(operation_has_request_body)
|| path_item.delete.as_ref().is_some_and(operation_has_request_body)
|| path_item.patch.as_ref().is_some_and(operation_has_request_body)
})
}
fn uses_literal_types(&self) -> bool {
self.spec.components.as_ref().is_some_and(|components| {
components
.schemas
.values()
.filter_map(|schema_ref| self.registry.resolve(schema_ref))
.any(|schema| self.schema_uses_literal_type(schema))
}) || self.spec.paths.paths.values().any(|path_item_ref| {
let ReferenceOr::Item(path_item) = path_item_ref else {
return false;
};
[
path_item.get.as_ref(),
path_item.post.as_ref(),
path_item.put.as_ref(),
path_item.delete.as_ref(),
path_item.patch.as_ref(),
]
.into_iter()
.flatten()
.any(|operation| self.operation_uses_literal_types(operation))
})
}
fn uses_date_types(&self) -> bool {
self.uses_special_string_type(PythonSpecialStringType::Date)
}
fn uses_datetime_types(&self) -> bool {
self.uses_special_string_type(PythonSpecialStringType::DateTime)
}
fn uses_uuid_types(&self) -> bool {
self.uses_special_string_type(PythonSpecialStringType::Uuid)
}
fn uses_special_string_type(&self, target: PythonSpecialStringType) -> bool {
self.spec.components.as_ref().is_some_and(|components| {
components
.schemas
.values()
.filter_map(|schema_ref| self.registry.resolve(schema_ref))
.any(|schema| self.schema_uses_special_string_type(schema, target))
}) || self.spec.paths.paths.values().any(|path_item_ref| {
let ReferenceOr::Item(path_item) = path_item_ref else {
return false;
};
[
path_item.get.as_ref(),
path_item.post.as_ref(),
path_item.put.as_ref(),
path_item.delete.as_ref(),
path_item.patch.as_ref(),
]
.into_iter()
.flatten()
.any(|operation| self.operation_uses_special_string_type(operation, target))
})
}
fn operation_uses_literal_types(&self, operation: &Operation) -> bool {
operation.parameters.iter().any(|parameter_ref| {
let ReferenceOr::Item(parameter) = parameter_ref else {
return false;
};
match parameter {
Parameter::Path { parameter_data, .. }
| Parameter::Query { parameter_data, .. }
| Parameter::Header { parameter_data, .. }
| Parameter::Cookie { parameter_data, .. } => self.parameter_uses_literal_type(parameter_data),
}
}) || self
.inline_request_body_model(operation)
.is_some_and(|(_, schema)| self.schema_uses_literal_type(schema))
|| self
.inline_response_model(operation)
.is_some_and(|(_, schema)| self.schema_uses_literal_type(schema))
|| operation
.request_body
.as_ref()
.and_then(|body_ref| match body_ref {
ReferenceOr::Item(request_body) => request_body
.content
.get("application/json")
.and_then(|media_type| media_type.schema.as_ref())
.and_then(|schema_ref| self.registry.resolve(schema_ref)),
ReferenceOr::Reference { .. } => None,
})
.is_some_and(|schema| self.schema_uses_literal_type(schema))
}
fn parameter_uses_literal_type(&self, parameter_data: &ParameterData) -> bool {
let ParameterSchemaOrContent::Schema(schema_ref) = ¶meter_data.format else {
return false;
};
self.registry
.resolve(schema_ref)
.is_some_and(|schema| self.schema_uses_literal_type(schema))
}
fn operation_uses_special_string_type(&self, operation: &Operation, target: PythonSpecialStringType) -> bool {
operation.parameters.iter().any(|parameter_ref| {
let ReferenceOr::Item(parameter) = parameter_ref else {
return false;
};
match parameter {
Parameter::Path { parameter_data, .. }
| Parameter::Query { parameter_data, .. }
| Parameter::Header { parameter_data, .. }
| Parameter::Cookie { parameter_data, .. } => {
self.parameter_uses_special_string_type(parameter_data, target)
}
}
}) || operation
.request_body
.as_ref()
.and_then(|body_ref| match body_ref {
ReferenceOr::Item(request_body) => request_body
.content
.get("application/json")
.and_then(|media_type| media_type.schema.as_ref())
.and_then(|schema_ref| self.registry.resolve(schema_ref)),
ReferenceOr::Reference { .. } => None,
})
.is_some_and(|schema| self.schema_uses_special_string_type(schema, target))
|| operation
.responses
.responses
.values()
.filter_map(|response_ref| match response_ref {
ReferenceOr::Item(response) => response
.content
.get("application/json")
.and_then(|media_type| media_type.schema.as_ref())
.and_then(|schema_ref| self.registry.resolve(schema_ref)),
ReferenceOr::Reference { .. } => None,
})
.any(|schema| self.schema_uses_special_string_type(schema, target))
}
fn parameter_uses_special_string_type(
&self,
parameter_data: &ParameterData,
target: PythonSpecialStringType,
) -> bool {
let ParameterSchemaOrContent::Schema(schema_ref) = ¶meter_data.format else {
return false;
};
self.registry
.resolve(schema_ref)
.is_some_and(|schema| self.schema_uses_special_string_type(schema, target))
}
fn schema_uses_literal_type(&self, schema: &Schema) -> bool {
if self.literal_type(schema).is_some() {
return true;
}
match &schema.schema_kind {
SchemaKind::Type(Type::Array(array_type)) => array_type
.items
.as_ref()
.and_then(|item_schema| match item_schema {
ReferenceOr::Item(item) => Some(item.as_ref()),
ReferenceOr::Reference { reference } => self.resolve_schema_reference(reference),
})
.is_some_and(|item| self.schema_uses_literal_type(item)),
SchemaKind::Type(Type::Object(object_type)) => {
object_type.properties.values().any(|schema_ref| match schema_ref {
ReferenceOr::Item(schema) => self.schema_uses_literal_type(schema),
ReferenceOr::Reference { reference } => self
.resolve_schema_reference(reference)
.is_some_and(|schema| self.schema_uses_literal_type(schema)),
})
}
SchemaKind::AllOf { all_of } => all_of.iter().any(|schema_ref| match schema_ref {
ReferenceOr::Item(schema) => self.schema_uses_literal_type(schema),
ReferenceOr::Reference { reference } => self
.resolve_schema_reference(reference)
.is_some_and(|schema| self.schema_uses_literal_type(schema)),
}),
_ => false,
}
}
fn schema_uses_special_string_type(&self, schema: &Schema, target: PythonSpecialStringType) -> bool {
if self.schema_matches_special_string_type(schema, target) {
return true;
}
match &schema.schema_kind {
SchemaKind::Type(Type::Array(array_type)) => array_type
.items
.as_ref()
.and_then(|item_schema| match item_schema {
ReferenceOr::Item(item) => Some(item.as_ref()),
ReferenceOr::Reference { reference } => self.resolve_schema_reference(reference),
})
.is_some_and(|item| self.schema_uses_special_string_type(item, target)),
SchemaKind::Type(Type::Object(object_type)) => {
object_type.properties.values().any(|schema_ref| match schema_ref {
ReferenceOr::Item(schema) => self.schema_uses_special_string_type(schema, target),
ReferenceOr::Reference { reference } => self
.resolve_schema_reference(reference)
.is_some_and(|schema| self.schema_uses_special_string_type(schema, target)),
})
}
SchemaKind::AllOf { all_of } => all_of.iter().any(|schema_ref| match schema_ref {
ReferenceOr::Item(schema) => self.schema_uses_special_string_type(schema, target),
ReferenceOr::Reference { reference } => self
.resolve_schema_reference(reference)
.is_some_and(|schema| self.schema_uses_special_string_type(schema, target)),
}),
_ => false,
}
}
fn schema_matches_special_string_type(&self, schema: &Schema, target: PythonSpecialStringType) -> bool {
let SchemaKind::Type(Type::String(string_type)) = &schema.schema_kind else {
return false;
};
match (&string_type.format, target) {
(VariantOrUnknownOrEmpty::Item(StringFormat::Date), PythonSpecialStringType::Date) => true,
(VariantOrUnknownOrEmpty::Item(StringFormat::DateTime), PythonSpecialStringType::DateTime) => true,
(VariantOrUnknownOrEmpty::Unknown(format), PythonSpecialStringType::Uuid) if format == "uuid" => true,
_ => false,
}
}
fn parameter_type_hint(&self, parameter_data: &ParameterData) -> String {
match ¶meter_data.format {
ParameterSchemaOrContent::Schema(schema_ref) => {
self.python_type_from_schema_ref(None, None, schema_ref, !parameter_data.required, None)
}
ParameterSchemaOrContent::Content(_) => {
self.append_optional("dict[str, object]".to_string(), !parameter_data.required)
}
}
}
fn spikard_imports(&self, uses_body: bool, uses_path: bool, uses_query: bool) -> String {
let mut imports = Vec::new();
if uses_body {
imports.push("Body");
}
if uses_path {
imports.push("Path");
}
if uses_query {
imports.push("Query");
}
imports.push("Request");
imports.push("Spikard");
imports.push("route");
imports.join(", ")
}
fn generate_routes(&self) -> Result<String> {
let mut output = String::new();
output.push_str("\n# Route Handlers\n\n");
for (path, path_item_ref) in &self.spec.paths.paths {
let path_item = match path_item_ref {
ReferenceOr::Item(item) => item,
ReferenceOr::Reference { .. } => continue,
};
if let Some(op) = &path_item.get {
output.push_str(&self.generate_route_handler(path, "get", op)?);
}
if let Some(op) = &path_item.post {
output.push_str(&self.generate_route_handler(path, "post", op)?);
}
if let Some(op) = &path_item.put {
output.push_str(&self.generate_route_handler(path, "put", op)?);
}
if let Some(op) = &path_item.delete {
output.push_str(&self.generate_route_handler(path, "delete", op)?);
}
if let Some(op) = &path_item.patch {
output.push_str(&self.generate_route_handler(path, "patch", op)?);
}
}
Ok(output)
}
fn generate_route_handler(&self, path: &str, method: &str, operation: &Operation) -> Result<String> {
let mut output = String::new();
let func_name = operation
.operation_id
.as_ref()
.map(|id| id.to_snake_case())
.unwrap_or_else(|| {
format!(
"{}_{}",
method,
path.replace('/', "_").replace(['{', '}'], "").trim_matches('_')
)
});
let mut path_params = Vec::new();
let mut query_params = Vec::new();
for param_ref in &operation.parameters {
if let ReferenceOr::Item(param) = param_ref {
match param {
Parameter::Path { parameter_data, .. } => {
path_params.push((parameter_data.name.clone(), self.parameter_type_hint(parameter_data)));
}
Parameter::Query { parameter_data, .. } => {
let type_hint = self.parameter_type_hint(parameter_data);
query_params.push((parameter_data.name.clone(), type_hint, parameter_data.required));
}
_ => {}
}
}
}
let body_type = self.extract_request_body_type(operation);
let return_type = self.extract_response_type(operation);
output.push_str(&format!(
"@route(\"{}\", methods=[\"{}\"])\n",
path,
method.to_uppercase()
));
output.push_str(&format!("def {func_name}(request: Request"));
for (param_name, param_type) in &path_params {
output.push_str(&format!(", {}: Path[{}]", param_name.to_snake_case(), param_type));
}
for (param_name, param_type, required) in query_params.iter().filter(|(_, _, required)| *required) {
output.push_str(&format!(", {}: Query[{}]", param_name.to_snake_case(), param_type));
}
for (param_name, param_type, required) in query_params.iter().filter(|(_, _, required)| !*required) {
let _ = required;
output.push_str(&format!(
", {}: Query[{}] = Query(default=None)",
param_name.to_snake_case(),
param_type
));
}
if let Some(body_type_name) = &body_type {
output.push_str(&format!(", body: Body[{body_type_name}]"));
}
output.push_str(&format!(") -> {return_type}:\n"));
let docstring =
summarize_operation_doc(operation).unwrap_or_else(|| format!("Handle {} {}.", method.to_uppercase(), path));
output.push_str(&format!(" \"\"\"{docstring}\"\"\"\n"));
output.push_str(" raise NotImplementedError(\"TODO: Implement this endpoint\")\n\n");
Ok(output)
}
fn generate_main(&self) -> String {
r#"
# Run the application
if __name__ == "__main__":
from spikard.config import ServerConfig
app.run(config=ServerConfig(host="0.0.0.0", port=8000))
"#
.to_string()
}
}
fn is_named_inline_object_schema(schema: &Schema) -> bool {
matches!(&schema.schema_kind, SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty())
}
fn inline_array_item_schema(schema: &Schema) -> Option<&Schema> {
match &schema.schema_kind {
SchemaKind::Type(Type::Array(array_type)) => match &array_type.items {
Some(ReferenceOr::Item(item_schema)) => Some(item_schema),
_ => None,
},
_ => None,
}
}
fn summarize_operation_doc(operation: &Operation) -> Option<String> {
operation
.summary
.as_deref()
.or(operation.description.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| {
let collapsed = value.split_whitespace().collect::<Vec<_>>().join(" ");
if collapsed.ends_with(['.', '!', '?']) {
collapsed
} else {
format!("{collapsed}.")
}
})
}
fn ensure_sentence(text: &str) -> String {
let trimmed = text.trim();
if trimmed.ends_with(['.', '!', '?']) {
trimmed.to_string()
} else {
format!("{trimmed}.")
}
}
fn operation_has_path_params(operation: &Operation) -> bool {
operation
.parameters
.iter()
.any(|param_ref| matches!(param_ref, ReferenceOr::Item(Parameter::Path { .. })))
}
fn operation_has_query_params(operation: &Operation) -> bool {
operation
.parameters
.iter()
.any(|param_ref| matches!(param_ref, ReferenceOr::Item(Parameter::Query { .. })))
}
fn operation_has_request_body(operation: &Operation) -> bool {
operation.request_body.is_some()
}
#[derive(Clone, Copy)]
enum PythonSpecialStringType {
Date,
DateTime,
Uuid,
}