1use super::{PythonDtoStyle, SchemaRegistry};
4use anyhow::{Result, bail};
5use heck::{ToPascalCase, ToSnakeCase};
6use openapiv3::{
7 OpenAPI, Operation, Parameter, ParameterData, ParameterSchemaOrContent, ReferenceOr, Schema, SchemaKind,
8 StringFormat, Type, VariantOrUnknownOrEmpty,
9};
10use std::collections::BTreeSet;
11
12pub struct PythonGenerator {
13 spec: OpenAPI,
14 dto: PythonDtoStyle,
15 registry: SchemaRegistry,
16}
17
18struct PythonFieldSpec {
19 original_name: String,
20 field_name: String,
21 type_hint: String,
22 required: bool,
23}
24
25impl PythonGenerator {
26 #[must_use]
27 pub fn new(spec: OpenAPI, dto: PythonDtoStyle) -> Self {
28 let registry = SchemaRegistry::from_spec(&spec);
29 Self { spec, dto, registry }
30 }
31
32 pub fn generate(&self) -> Result<String> {
33 let mut output = String::new();
34
35 output.push_str(&self.generate_header());
36
37 output.push_str(&self.generate_models()?);
38
39 output.push_str(&self.generate_routes()?);
40
41 output.push_str(&self.generate_main());
42
43 Ok(output)
44 }
45
46 fn generate_header(&self) -> String {
47 let mut header = String::new();
48 header.push_str("from __future__ import annotations\n\n");
49
50 let uses_path = self.uses_path_params();
51 let uses_query = self.uses_query_params();
52 let uses_body = self.uses_request_body();
53
54 let noqa = if uses_query {
57 "# ruff: noqa: B008, I001, INP001\n\n"
58 } else {
59 "# ruff: noqa: I001, INP001\n\n"
60 };
61 header.push_str(noqa);
62
63 let uses_literal = self.uses_literal_types();
64 let uses_date = self.uses_date_types();
65 let uses_datetime = self.uses_datetime_types();
66 let uses_uuid = self.uses_uuid_types();
67
68 match self.dto {
69 PythonDtoStyle::Dataclass => {
70 header.push_str(&format!(
71 r"# Generated by Spikard OpenAPI code generator
72# OpenAPI Version: {}
73# Title: {}
74# DO NOT EDIT - regenerate from OpenAPI schema
75
76from dataclasses import dataclass
77",
78 self.spec.openapi, self.spec.info.title,
79 ));
80 }
81 PythonDtoStyle::Msgspec => {
82 header.push_str(&format!(
83 r"# Generated by Spikard OpenAPI code generator
84# OpenAPI Version: {}
85# Title: {}
86# DO NOT EDIT - regenerate from OpenAPI schema
87
88import msgspec
89",
90 self.spec.openapi, self.spec.info.title,
91 ));
92 }
93 }
94
95 if uses_literal {
96 header.push_str("from typing import Literal\n");
97 }
98 if uses_date || uses_datetime {
99 if uses_date && uses_datetime {
100 header.push_str("from datetime import date, datetime\n");
101 } else if uses_date {
102 header.push_str("from datetime import date\n");
103 } else {
104 header.push_str("from datetime import datetime\n");
105 }
106 }
107 if uses_uuid {
108 header.push_str("from uuid import UUID\n");
109 }
110
111 header.push_str("from spikard import ");
112 header.push_str(&self.spikard_imports(uses_body, uses_path, uses_query));
113 header.push_str("\n\napp = Spikard()\n\n");
114
115 header
116 }
117
118 fn generate_models(&self) -> Result<String> {
119 if self.dto == PythonDtoStyle::Msgspec {
120 return self.generate_msgspec_models();
121 }
122 self.generate_dataclass_models()
123 }
124
125 fn generate_dataclass_models(&self) -> Result<String> {
126 let mut output = String::new();
127 output.push_str("# Schema Models\n\n");
128 let mut emitted = BTreeSet::new();
129
130 if let Some(components) = &self.spec.components {
131 for (name, schema_ref) in &components.schemas {
132 match schema_ref {
133 ReferenceOr::Item(schema) => {
134 self.generate_dataclass_family(&name.to_pascal_case(), schema, &mut emitted, &mut output)?;
135 }
136 ReferenceOr::Reference { .. } => continue,
137 }
138 }
139 }
140
141 self.generate_inline_route_models(&mut emitted, &mut output)?;
142
143 Ok(output)
144 }
145
146 fn generate_msgspec_models(&self) -> Result<String> {
147 let mut output = String::new();
148 output.push_str("# Schema Models\n\n");
149 let mut emitted = BTreeSet::new();
150
151 if let Some(components) = &self.spec.components {
152 for (name, schema_ref) in &components.schemas {
153 match schema_ref {
154 ReferenceOr::Item(schema) => {
155 self.generate_msgspec_family(&name.to_pascal_case(), schema, &mut emitted, &mut output)?;
156 }
157 ReferenceOr::Reference { .. } => continue,
158 }
159 }
160 }
161
162 self.generate_inline_route_models(&mut emitted, &mut output)?;
163
164 Ok(output)
165 }
166
167 fn generate_inline_route_models(&self, emitted: &mut BTreeSet<String>, output: &mut String) -> Result<()> {
168 for path_item_ref in self.spec.paths.paths.values() {
169 let ReferenceOr::Item(path_item) = path_item_ref else {
170 continue;
171 };
172
173 for operation in [
174 path_item.get.as_ref(),
175 path_item.post.as_ref(),
176 path_item.put.as_ref(),
177 path_item.delete.as_ref(),
178 path_item.patch.as_ref(),
179 ]
180 .into_iter()
181 .flatten()
182 {
183 if let Some((class_name, schema)) = self.inline_request_body_model(operation) {
184 self.generate_model_family(&class_name, schema, emitted, output)?;
185 }
186 if let Some((class_name, schema)) = self.inline_response_model(operation) {
187 self.generate_model_family(&class_name, schema, emitted, output)?;
188 }
189 }
190 }
191
192 Ok(())
193 }
194
195 fn generate_model_family(
196 &self,
197 class_name: &str,
198 schema: &Schema,
199 emitted: &mut BTreeSet<String>,
200 output: &mut String,
201 ) -> Result<()> {
202 match self.dto {
203 PythonDtoStyle::Dataclass => self.generate_dataclass_family(class_name, schema, emitted, output),
204 PythonDtoStyle::Msgspec => self.generate_msgspec_family(class_name, schema, emitted, output),
205 }
206 }
207
208 fn generate_dataclass_family(
209 &self,
210 class_name: &str,
211 schema: &Schema,
212 emitted: &mut BTreeSet<String>,
213 output: &mut String,
214 ) -> Result<()> {
215 if !emitted.insert(class_name.to_string()) {
216 return Ok(());
217 }
218
219 self.generate_nested_families(class_name, schema, emitted, output)?;
220 output.push_str(&self.generate_dataclass(class_name, schema)?);
221 output.push('\n');
222 Ok(())
223 }
224
225 fn generate_msgspec_family(
226 &self,
227 class_name: &str,
228 schema: &Schema,
229 emitted: &mut BTreeSet<String>,
230 output: &mut String,
231 ) -> Result<()> {
232 if !emitted.insert(class_name.to_string()) {
233 return Ok(());
234 }
235
236 self.generate_nested_families(class_name, schema, emitted, output)?;
237 output.push_str(&self.generate_msgspec_struct(class_name, schema)?);
238 output.push('\n');
239 Ok(())
240 }
241
242 fn generate_nested_families(
243 &self,
244 parent_class_name: &str,
245 schema: &Schema,
246 emitted: &mut BTreeSet<String>,
247 output: &mut String,
248 ) -> Result<()> {
249 match &schema.schema_kind {
250 SchemaKind::Type(Type::Object(obj)) => {
251 for (prop_name, prop_schema_ref) in &obj.properties {
252 match prop_schema_ref {
253 ReferenceOr::Item(prop_schema) => {
254 if let Some(class_name) = self.inline_model_name(parent_class_name, prop_name, prop_schema)
255 {
256 self.generate_model_family(&class_name, prop_schema, emitted, output)?;
257 }
258 if let Some(array_item_name) =
259 self.inline_array_item_model_name(parent_class_name, prop_name, prop_schema)
260 && let Some(item_schema) = inline_array_item_schema(prop_schema)
261 {
262 self.generate_model_family(&array_item_name, item_schema, emitted, output)?;
263 }
264 }
265 ReferenceOr::Reference { .. } => {}
266 }
267 }
268 }
269 SchemaKind::AllOf { all_of } => {
270 for schema_ref in all_of {
271 match schema_ref {
272 ReferenceOr::Item(item) => {
273 self.generate_nested_families(parent_class_name, item, emitted, output)?
274 }
275 ReferenceOr::Reference { reference } => {
276 if let Some(resolved) = self.resolve_schema_reference(reference) {
277 self.generate_nested_families(parent_class_name, resolved, emitted, output)?;
278 }
279 }
280 }
281 }
282 }
283 _ => {}
284 }
285
286 Ok(())
287 }
288
289 fn generate_dataclass(&self, class_name: &str, schema: &Schema) -> Result<String> {
290 if self.dto != PythonDtoStyle::Dataclass {
291 bail!("dataclass generation called for non-dataclass style");
292 }
293
294 let mut output = String::new();
295
296 output.push_str("@dataclass(slots=True, kw_only=True)\n");
297 output.push_str(&format!("class {class_name}:\n"));
298 output.push_str(&format!(
299 " \"\"\"{}\"\"\"\n",
300 ensure_sentence(
301 schema
302 .schema_data
303 .description
304 .as_deref()
305 .map(str::trim)
306 .filter(|value| !value.is_empty())
307 .unwrap_or("Generated OpenAPI schema model.")
308 )
309 ));
310
311 let mut fields = Vec::new();
312 self.collect_model_fields_into(class_name, schema, &mut fields);
313
314 if !fields.is_empty() {
315 fields.sort_by_key(|field| !field.required);
316
317 for field in fields {
318 if field.required {
319 output.push_str(&format!(" {}: {}\n", field.field_name, field.type_hint));
320 } else {
321 output.push_str(&format!(" {}: {} = None\n", field.field_name, field.type_hint));
322 }
323 }
324 }
325
326 Ok(output)
327 }
328
329 fn generate_msgspec_struct(&self, name: &str, schema: &Schema) -> Result<String> {
330 let class_name = name.to_pascal_case();
331 let mut output = String::new();
332
333 output.push_str(&format!("class {class_name}(msgspec.Struct):\n"));
334 output.push_str(&format!(
335 " \"\"\"{}\"\"\"\n",
336 ensure_sentence(
337 schema
338 .schema_data
339 .description
340 .as_deref()
341 .map(str::trim)
342 .filter(|value| !value.is_empty())
343 .unwrap_or("Generated OpenAPI schema model.")
344 )
345 ));
346
347 let mut fields = Vec::new();
348 self.collect_model_fields_into(&class_name, schema, &mut fields);
349
350 if !fields.is_empty() {
351 fields.sort_by_key(|field| !field.required);
352
353 for field in fields {
354 if field.required {
355 output.push_str(&format!(" {}: {}\n", field.field_name, field.type_hint));
356 } else {
357 output.push_str(&format!(" {}: {} = None\n", field.field_name, field.type_hint));
358 }
359 }
360 }
361
362 Ok(output)
363 }
364
365 fn collect_model_fields_into(&self, parent_class_name: &str, schema: &Schema, fields: &mut Vec<PythonFieldSpec>) {
366 match &schema.schema_kind {
367 SchemaKind::Type(Type::Object(obj)) => {
368 for (prop_name, prop_schema_ref) in &obj.properties {
369 if fields.iter().any(|field| field.original_name == *prop_name) {
370 continue;
371 }
372
373 let is_required = obj.required.contains(prop_name);
374 fields.push(PythonFieldSpec {
375 original_name: prop_name.clone(),
376 field_name: prop_name.to_snake_case(),
377 type_hint: self.python_type_from_boxed_schema_ref(
378 parent_class_name,
379 prop_name,
380 prop_schema_ref,
381 !is_required,
382 ),
383 required: is_required,
384 });
385 }
386 }
387 SchemaKind::AllOf { all_of } => {
388 for schema_ref in all_of {
389 match schema_ref {
390 ReferenceOr::Item(schema) => self.collect_model_fields_into(parent_class_name, schema, fields),
391 ReferenceOr::Reference { reference } => {
392 if let Some(schema) = self.resolve_schema_reference(reference) {
393 self.collect_model_fields_into(parent_class_name, schema, fields);
394 }
395 }
396 }
397 }
398 }
399 _ => {}
400 }
401 }
402
403 fn resolve_schema_reference<'a>(&'a self, reference: &str) -> Option<&'a Schema> {
404 let name = reference.split('/').next_back()?;
405 self.spec
406 .components
407 .as_ref()?
408 .schemas
409 .get(name)
410 .and_then(|schema_ref| match schema_ref {
411 ReferenceOr::Item(schema) => Some(schema),
412 ReferenceOr::Reference { .. } => None,
413 })
414 }
415
416 fn extract_type_from_schema_ref(&self, schema_ref: &ReferenceOr<Schema>, inline_name: Option<&str>) -> String {
418 self.python_type_from_schema_ref(None, None, schema_ref, false, inline_name)
419 }
420
421 fn extract_request_body_type(&self, operation: &Operation) -> Option<String> {
423 operation.request_body.as_ref().and_then(|body_ref| match body_ref {
424 ReferenceOr::Item(request_body) => request_body.content.get("application/json").and_then(|media_type| {
425 media_type.schema.as_ref().map(|schema_ref| {
426 self.extract_type_from_schema_ref(
427 schema_ref,
428 operation
429 .operation_id
430 .as_deref()
431 .map(|id| format!("{}RequestBody", id.to_pascal_case()))
432 .as_deref(),
433 )
434 })
435 }),
436 ReferenceOr::Reference { reference } => {
437 let ref_name = reference.split('/').next_back().unwrap();
438 Some(ref_name.to_pascal_case())
439 }
440 })
441 }
442
443 fn extract_response_type(&self, operation: &Operation) -> String {
445 use openapiv3::StatusCode;
446
447 let response = operation
448 .responses
449 .responses
450 .get(&StatusCode::Code(200))
451 .or_else(|| operation.responses.responses.get(&StatusCode::Code(201)))
452 .or_else(|| operation.responses.responses.get(&StatusCode::Range(2)));
453
454 if let Some(response_ref) = response {
455 match response_ref {
456 ReferenceOr::Item(response) => {
457 if let Some(content) = response.content.get("application/json")
458 && let Some(schema_ref) = &content.schema
459 {
460 return self.extract_type_from_schema_ref(
461 schema_ref,
462 operation
463 .operation_id
464 .as_deref()
465 .map(|id| format!("{}ResponseBody", id.to_pascal_case()))
466 .as_deref(),
467 );
468 }
469 }
470 ReferenceOr::Reference { reference } => {
471 let ref_name = reference.split('/').next_back().unwrap();
472 return ref_name.to_pascal_case();
473 }
474 }
475 }
476
477 "dict[str, object]".to_string()
478 }
479
480 fn python_type_from_schema_ref(
481 &self,
482 parent_class_name: Option<&str>,
483 field_name: Option<&str>,
484 schema_ref: &ReferenceOr<Schema>,
485 optional: bool,
486 inline_name: Option<&str>,
487 ) -> String {
488 match schema_ref {
489 ReferenceOr::Item(schema) => {
490 self.schema_to_python_type(parent_class_name, field_name, schema, optional, inline_name)
491 }
492 ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, optional),
493 }
494 }
495
496 fn python_type_from_boxed_schema_ref(
497 &self,
498 parent_class_name: &str,
499 field_name: &str,
500 schema_ref: &ReferenceOr<Box<Schema>>,
501 optional: bool,
502 ) -> String {
503 match schema_ref {
504 ReferenceOr::Item(schema) => {
505 self.schema_to_python_type(Some(parent_class_name), Some(field_name), schema, optional, None)
506 }
507 ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, optional),
508 }
509 }
510
511 fn python_type_from_reference(&self, reference: &str, optional: bool) -> String {
512 let mut base = reference.split('/').next_back().unwrap().to_pascal_case();
513 if let Some(schema) = self.registry.resolve_reference(reference)
514 && schema.schema_data.nullable
515 {
516 base = self.append_optional(base, true);
517 }
518 self.append_optional(base, optional)
519 }
520
521 #[allow(clippy::only_used_in_recursion)]
522 fn schema_to_python_type(
523 &self,
524 parent_class_name: Option<&str>,
525 field_name: Option<&str>,
526 schema: &Schema,
527 optional: bool,
528 inline_name: Option<&str>,
529 ) -> String {
530 let mut base_type = if let Some(literal_type) = self.literal_type(schema) {
531 literal_type
532 } else {
533 match &schema.schema_kind {
534 SchemaKind::Type(Type::String(string_type)) => self.string_format_python_type(string_type),
535 SchemaKind::Type(Type::Number(_)) => "float".to_string(),
536 SchemaKind::Type(Type::Integer(_)) => "int".to_string(),
537 SchemaKind::Type(Type::Boolean(_)) => "bool".to_string(),
538 SchemaKind::Type(Type::Array(arr)) => {
539 let item_type = match &arr.items {
540 Some(item_schema) => self.python_array_item_type(parent_class_name, field_name, item_schema),
541 None => "dict[str, object]".to_string(),
542 };
543 format!("list[{item_type}]")
544 }
545 SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => inline_name
546 .map(ToOwned::to_owned)
547 .or_else(|| {
548 parent_class_name
549 .zip(field_name)
550 .map(|(parent, field)| format!("{parent}{}", field.to_pascal_case()))
551 })
552 .unwrap_or_else(|| "dict[str, object]".to_string()),
553 SchemaKind::Type(Type::Object(_)) => "dict[str, object]".to_string(),
554 _ => "dict[str, object]".to_string(),
555 }
556 };
557
558 if schema.schema_data.nullable {
559 base_type = self.append_optional(base_type, true);
560 }
561
562 self.append_optional(base_type, optional)
563 }
564
565 fn literal_type(&self, schema: &Schema) -> Option<String> {
566 match &schema.schema_kind {
567 SchemaKind::Type(Type::String(string_type)) => {
568 let values = string_type
569 .enumeration
570 .iter()
571 .flatten()
572 .map(|value| serde_json::to_string(value).ok())
573 .collect::<Option<Vec<_>>>()?;
574 (!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
575 }
576 SchemaKind::Type(Type::Integer(integer_type)) => {
577 let values = integer_type
578 .enumeration
579 .iter()
580 .flatten()
581 .map(ToString::to_string)
582 .collect::<Vec<_>>();
583 (!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
584 }
585 SchemaKind::Type(Type::Number(number_type)) => {
586 let values = number_type
587 .enumeration
588 .iter()
589 .flatten()
590 .map(ToString::to_string)
591 .collect::<Vec<_>>();
592 (!values.is_empty()).then(|| format!("Literal[{}]", values.join(", ")))
593 }
594 _ => None,
595 }
596 }
597
598 fn string_format_python_type(&self, string_type: &openapiv3::StringType) -> String {
599 match &string_type.format {
600 VariantOrUnknownOrEmpty::Item(StringFormat::Date) => "date".to_string(),
601 VariantOrUnknownOrEmpty::Item(StringFormat::DateTime) => "datetime".to_string(),
602 VariantOrUnknownOrEmpty::Item(StringFormat::Byte | StringFormat::Binary) => "bytes".to_string(),
603 VariantOrUnknownOrEmpty::Unknown(format) if format == "uuid" => "UUID".to_string(),
604 _ => "str".to_string(),
605 }
606 }
607
608 fn append_optional(&self, base: String, optional: bool) -> String {
609 if optional && !base.trim().ends_with("| None") {
610 format!("{base} | None")
611 } else {
612 base
613 }
614 }
615
616 fn python_array_item_type(
617 &self,
618 parent_class_name: Option<&str>,
619 field_name: Option<&str>,
620 schema_ref: &ReferenceOr<Box<Schema>>,
621 ) -> String {
622 match schema_ref {
623 ReferenceOr::Item(schema) => {
624 let inline_item_name = parent_class_name
625 .zip(field_name)
626 .and_then(|(parent, field)| self.inline_array_item_model_name(parent, field, schema));
627 self.schema_to_python_type(None, None, schema, false, inline_item_name.as_deref())
628 }
629 ReferenceOr::Reference { reference } => self.python_type_from_reference(reference, false),
630 }
631 }
632
633 fn inline_model_name(&self, parent_class_name: &str, field_name: &str, schema: &Schema) -> Option<String> {
634 match &schema.schema_kind {
635 SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => {
636 Some(format!("{parent_class_name}{}", field_name.to_pascal_case()))
637 }
638 _ => None,
639 }
640 }
641
642 fn inline_array_item_model_name(
643 &self,
644 parent_class_name: &str,
645 field_name: &str,
646 schema: &Schema,
647 ) -> Option<String> {
648 let item_schema = inline_array_item_schema(schema)?;
649 match &item_schema.schema_kind {
650 SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty() => {
651 Some(format!("{parent_class_name}{}Item", field_name.to_pascal_case()))
652 }
653 _ => None,
654 }
655 }
656
657 fn inline_request_body_model<'a>(&self, operation: &'a Operation) -> Option<(String, &'a Schema)> {
658 let operation_id = operation.operation_id.as_deref()?;
659 let request_body = operation.request_body.as_ref()?;
660
661 match request_body {
662 ReferenceOr::Item(body) => {
663 let schema_ref = body.content.get("application/json")?.schema.as_ref()?;
664 match schema_ref {
665 ReferenceOr::Item(schema) => {
666 if is_named_inline_object_schema(schema) {
667 Some((format!("{}RequestBody", operation_id.to_pascal_case()), schema))
668 } else {
669 None
670 }
671 }
672 ReferenceOr::Reference { .. } => None,
673 }
674 }
675 ReferenceOr::Reference { .. } => None,
676 }
677 }
678
679 fn inline_response_model<'a>(&self, operation: &'a Operation) -> Option<(String, &'a Schema)> {
680 use openapiv3::StatusCode;
681
682 let operation_id = operation.operation_id.as_deref()?;
683 let response = operation
684 .responses
685 .responses
686 .get(&StatusCode::Code(200))
687 .or_else(|| operation.responses.responses.get(&StatusCode::Code(201)))
688 .or_else(|| operation.responses.responses.get(&StatusCode::Range(2)))?;
689
690 match response {
691 ReferenceOr::Item(response) => {
692 let schema_ref = response.content.get("application/json")?.schema.as_ref()?;
693 match schema_ref {
694 ReferenceOr::Item(schema) => {
695 if is_named_inline_object_schema(schema) {
696 Some((format!("{}ResponseBody", operation_id.to_pascal_case()), schema))
697 } else {
698 None
699 }
700 }
701 ReferenceOr::Reference { .. } => None,
702 }
703 }
704 ReferenceOr::Reference { .. } => None,
705 }
706 }
707
708 fn uses_path_params(&self) -> bool {
709 self.spec.paths.paths.values().any(|path_item_ref| {
710 let ReferenceOr::Item(path_item) = path_item_ref else {
711 return false;
712 };
713
714 path_item.get.as_ref().is_some_and(operation_has_path_params)
715 || path_item.post.as_ref().is_some_and(operation_has_path_params)
716 || path_item.put.as_ref().is_some_and(operation_has_path_params)
717 || path_item.delete.as_ref().is_some_and(operation_has_path_params)
718 || path_item.patch.as_ref().is_some_and(operation_has_path_params)
719 })
720 }
721
722 fn uses_query_params(&self) -> bool {
723 self.spec.paths.paths.values().any(|path_item_ref| {
724 let ReferenceOr::Item(path_item) = path_item_ref else {
725 return false;
726 };
727
728 path_item.get.as_ref().is_some_and(operation_has_query_params)
729 || path_item.post.as_ref().is_some_and(operation_has_query_params)
730 || path_item.put.as_ref().is_some_and(operation_has_query_params)
731 || path_item.delete.as_ref().is_some_and(operation_has_query_params)
732 || path_item.patch.as_ref().is_some_and(operation_has_query_params)
733 })
734 }
735
736 fn uses_request_body(&self) -> bool {
737 self.spec.paths.paths.values().any(|path_item_ref| {
738 let ReferenceOr::Item(path_item) = path_item_ref else {
739 return false;
740 };
741
742 path_item.get.as_ref().is_some_and(operation_has_request_body)
743 || path_item.post.as_ref().is_some_and(operation_has_request_body)
744 || path_item.put.as_ref().is_some_and(operation_has_request_body)
745 || path_item.delete.as_ref().is_some_and(operation_has_request_body)
746 || path_item.patch.as_ref().is_some_and(operation_has_request_body)
747 })
748 }
749
750 fn uses_literal_types(&self) -> bool {
751 self.spec.components.as_ref().is_some_and(|components| {
752 components
753 .schemas
754 .values()
755 .filter_map(|schema_ref| self.registry.resolve(schema_ref))
756 .any(|schema| self.schema_uses_literal_type(schema))
757 }) || self.spec.paths.paths.values().any(|path_item_ref| {
758 let ReferenceOr::Item(path_item) = path_item_ref else {
759 return false;
760 };
761
762 [
763 path_item.get.as_ref(),
764 path_item.post.as_ref(),
765 path_item.put.as_ref(),
766 path_item.delete.as_ref(),
767 path_item.patch.as_ref(),
768 ]
769 .into_iter()
770 .flatten()
771 .any(|operation| self.operation_uses_literal_types(operation))
772 })
773 }
774
775 fn uses_date_types(&self) -> bool {
776 self.uses_special_string_type(PythonSpecialStringType::Date)
777 }
778
779 fn uses_datetime_types(&self) -> bool {
780 self.uses_special_string_type(PythonSpecialStringType::DateTime)
781 }
782
783 fn uses_uuid_types(&self) -> bool {
784 self.uses_special_string_type(PythonSpecialStringType::Uuid)
785 }
786
787 fn uses_special_string_type(&self, target: PythonSpecialStringType) -> bool {
788 self.spec.components.as_ref().is_some_and(|components| {
789 components
790 .schemas
791 .values()
792 .filter_map(|schema_ref| self.registry.resolve(schema_ref))
793 .any(|schema| self.schema_uses_special_string_type(schema, target))
794 }) || self.spec.paths.paths.values().any(|path_item_ref| {
795 let ReferenceOr::Item(path_item) = path_item_ref else {
796 return false;
797 };
798
799 [
800 path_item.get.as_ref(),
801 path_item.post.as_ref(),
802 path_item.put.as_ref(),
803 path_item.delete.as_ref(),
804 path_item.patch.as_ref(),
805 ]
806 .into_iter()
807 .flatten()
808 .any(|operation| self.operation_uses_special_string_type(operation, target))
809 })
810 }
811
812 fn operation_uses_literal_types(&self, operation: &Operation) -> bool {
813 operation.parameters.iter().any(|parameter_ref| {
814 let ReferenceOr::Item(parameter) = parameter_ref else {
815 return false;
816 };
817
818 match parameter {
819 Parameter::Path { parameter_data, .. }
820 | Parameter::Query { parameter_data, .. }
821 | Parameter::Header { parameter_data, .. }
822 | Parameter::Cookie { parameter_data, .. } => self.parameter_uses_literal_type(parameter_data),
823 }
824 }) || self
825 .inline_request_body_model(operation)
826 .is_some_and(|(_, schema)| self.schema_uses_literal_type(schema))
827 || self
828 .inline_response_model(operation)
829 .is_some_and(|(_, schema)| self.schema_uses_literal_type(schema))
830 || operation
831 .request_body
832 .as_ref()
833 .and_then(|body_ref| match body_ref {
834 ReferenceOr::Item(request_body) => request_body
835 .content
836 .get("application/json")
837 .and_then(|media_type| media_type.schema.as_ref())
838 .and_then(|schema_ref| self.registry.resolve(schema_ref)),
839 ReferenceOr::Reference { .. } => None,
840 })
841 .is_some_and(|schema| self.schema_uses_literal_type(schema))
842 }
843
844 fn parameter_uses_literal_type(&self, parameter_data: &ParameterData) -> bool {
845 let ParameterSchemaOrContent::Schema(schema_ref) = ¶meter_data.format else {
846 return false;
847 };
848 self.registry
849 .resolve(schema_ref)
850 .is_some_and(|schema| self.schema_uses_literal_type(schema))
851 }
852
853 fn operation_uses_special_string_type(&self, operation: &Operation, target: PythonSpecialStringType) -> bool {
854 operation.parameters.iter().any(|parameter_ref| {
855 let ReferenceOr::Item(parameter) = parameter_ref else {
856 return false;
857 };
858
859 match parameter {
860 Parameter::Path { parameter_data, .. }
861 | Parameter::Query { parameter_data, .. }
862 | Parameter::Header { parameter_data, .. }
863 | Parameter::Cookie { parameter_data, .. } => {
864 self.parameter_uses_special_string_type(parameter_data, target)
865 }
866 }
867 }) || operation
868 .request_body
869 .as_ref()
870 .and_then(|body_ref| match body_ref {
871 ReferenceOr::Item(request_body) => request_body
872 .content
873 .get("application/json")
874 .and_then(|media_type| media_type.schema.as_ref())
875 .and_then(|schema_ref| self.registry.resolve(schema_ref)),
876 ReferenceOr::Reference { .. } => None,
877 })
878 .is_some_and(|schema| self.schema_uses_special_string_type(schema, target))
879 || operation
880 .responses
881 .responses
882 .values()
883 .filter_map(|response_ref| match response_ref {
884 ReferenceOr::Item(response) => response
885 .content
886 .get("application/json")
887 .and_then(|media_type| media_type.schema.as_ref())
888 .and_then(|schema_ref| self.registry.resolve(schema_ref)),
889 ReferenceOr::Reference { .. } => None,
890 })
891 .any(|schema| self.schema_uses_special_string_type(schema, target))
892 }
893
894 fn parameter_uses_special_string_type(
895 &self,
896 parameter_data: &ParameterData,
897 target: PythonSpecialStringType,
898 ) -> bool {
899 let ParameterSchemaOrContent::Schema(schema_ref) = ¶meter_data.format else {
900 return false;
901 };
902 self.registry
903 .resolve(schema_ref)
904 .is_some_and(|schema| self.schema_uses_special_string_type(schema, target))
905 }
906
907 fn schema_uses_literal_type(&self, schema: &Schema) -> bool {
908 if self.literal_type(schema).is_some() {
909 return true;
910 }
911
912 match &schema.schema_kind {
913 SchemaKind::Type(Type::Array(array_type)) => array_type
914 .items
915 .as_ref()
916 .and_then(|item_schema| match item_schema {
917 ReferenceOr::Item(item) => Some(item.as_ref()),
918 ReferenceOr::Reference { reference } => self.resolve_schema_reference(reference),
919 })
920 .is_some_and(|item| self.schema_uses_literal_type(item)),
921 SchemaKind::Type(Type::Object(object_type)) => {
922 object_type.properties.values().any(|schema_ref| match schema_ref {
923 ReferenceOr::Item(schema) => self.schema_uses_literal_type(schema),
924 ReferenceOr::Reference { reference } => self
925 .resolve_schema_reference(reference)
926 .is_some_and(|schema| self.schema_uses_literal_type(schema)),
927 })
928 }
929 SchemaKind::AllOf { all_of } => all_of.iter().any(|schema_ref| match schema_ref {
930 ReferenceOr::Item(schema) => self.schema_uses_literal_type(schema),
931 ReferenceOr::Reference { reference } => self
932 .resolve_schema_reference(reference)
933 .is_some_and(|schema| self.schema_uses_literal_type(schema)),
934 }),
935 _ => false,
936 }
937 }
938
939 fn schema_uses_special_string_type(&self, schema: &Schema, target: PythonSpecialStringType) -> bool {
940 if self.schema_matches_special_string_type(schema, target) {
941 return true;
942 }
943
944 match &schema.schema_kind {
945 SchemaKind::Type(Type::Array(array_type)) => array_type
946 .items
947 .as_ref()
948 .and_then(|item_schema| match item_schema {
949 ReferenceOr::Item(item) => Some(item.as_ref()),
950 ReferenceOr::Reference { reference } => self.resolve_schema_reference(reference),
951 })
952 .is_some_and(|item| self.schema_uses_special_string_type(item, target)),
953 SchemaKind::Type(Type::Object(object_type)) => {
954 object_type.properties.values().any(|schema_ref| match schema_ref {
955 ReferenceOr::Item(schema) => self.schema_uses_special_string_type(schema, target),
956 ReferenceOr::Reference { reference } => self
957 .resolve_schema_reference(reference)
958 .is_some_and(|schema| self.schema_uses_special_string_type(schema, target)),
959 })
960 }
961 SchemaKind::AllOf { all_of } => all_of.iter().any(|schema_ref| match schema_ref {
962 ReferenceOr::Item(schema) => self.schema_uses_special_string_type(schema, target),
963 ReferenceOr::Reference { reference } => self
964 .resolve_schema_reference(reference)
965 .is_some_and(|schema| self.schema_uses_special_string_type(schema, target)),
966 }),
967 _ => false,
968 }
969 }
970
971 fn schema_matches_special_string_type(&self, schema: &Schema, target: PythonSpecialStringType) -> bool {
972 let SchemaKind::Type(Type::String(string_type)) = &schema.schema_kind else {
973 return false;
974 };
975
976 match (&string_type.format, target) {
977 (VariantOrUnknownOrEmpty::Item(StringFormat::Date), PythonSpecialStringType::Date) => true,
978 (VariantOrUnknownOrEmpty::Item(StringFormat::DateTime), PythonSpecialStringType::DateTime) => true,
979 (VariantOrUnknownOrEmpty::Unknown(format), PythonSpecialStringType::Uuid) if format == "uuid" => true,
980 _ => false,
981 }
982 }
983
984 fn parameter_type_hint(&self, parameter_data: &ParameterData) -> String {
985 match ¶meter_data.format {
986 ParameterSchemaOrContent::Schema(schema_ref) => {
987 self.python_type_from_schema_ref(None, None, schema_ref, !parameter_data.required, None)
988 }
989 ParameterSchemaOrContent::Content(_) => {
990 self.append_optional("dict[str, object]".to_string(), !parameter_data.required)
991 }
992 }
993 }
994
995 fn spikard_imports(&self, uses_body: bool, uses_path: bool, uses_query: bool) -> String {
996 let mut imports = Vec::new();
997 if uses_body {
998 imports.push("Body");
999 }
1000 if uses_path {
1001 imports.push("Path");
1002 }
1003 if uses_query {
1004 imports.push("Query");
1005 }
1006 imports.push("Request");
1007 imports.push("Spikard");
1008 imports.push("route");
1009 imports.join(", ")
1010 }
1011
1012 fn generate_routes(&self) -> Result<String> {
1013 let mut output = String::new();
1014 output.push_str("\n# Route Handlers\n\n");
1015
1016 for (path, path_item_ref) in &self.spec.paths.paths {
1017 let path_item = match path_item_ref {
1018 ReferenceOr::Item(item) => item,
1019 ReferenceOr::Reference { .. } => continue,
1020 };
1021
1022 if let Some(op) = &path_item.get {
1023 output.push_str(&self.generate_route_handler(path, "get", op)?);
1024 }
1025 if let Some(op) = &path_item.post {
1026 output.push_str(&self.generate_route_handler(path, "post", op)?);
1027 }
1028 if let Some(op) = &path_item.put {
1029 output.push_str(&self.generate_route_handler(path, "put", op)?);
1030 }
1031 if let Some(op) = &path_item.delete {
1032 output.push_str(&self.generate_route_handler(path, "delete", op)?);
1033 }
1034 if let Some(op) = &path_item.patch {
1035 output.push_str(&self.generate_route_handler(path, "patch", op)?);
1036 }
1037 }
1038
1039 Ok(output)
1040 }
1041
1042 fn generate_route_handler(&self, path: &str, method: &str, operation: &Operation) -> Result<String> {
1043 let mut output = String::new();
1044
1045 let func_name = operation
1046 .operation_id
1047 .as_ref()
1048 .map(|id| id.to_snake_case())
1049 .unwrap_or_else(|| {
1050 format!(
1051 "{}_{}",
1052 method,
1053 path.replace('/', "_").replace(['{', '}'], "").trim_matches('_')
1054 )
1055 });
1056
1057 let mut path_params = Vec::new();
1058 let mut query_params = Vec::new();
1059
1060 for param_ref in &operation.parameters {
1061 if let ReferenceOr::Item(param) = param_ref {
1062 match param {
1063 Parameter::Path { parameter_data, .. } => {
1064 path_params.push((parameter_data.name.clone(), self.parameter_type_hint(parameter_data)));
1065 }
1066 Parameter::Query { parameter_data, .. } => {
1067 let type_hint = self.parameter_type_hint(parameter_data);
1068 query_params.push((parameter_data.name.clone(), type_hint, parameter_data.required));
1069 }
1070 _ => {}
1071 }
1072 }
1073 }
1074
1075 let body_type = self.extract_request_body_type(operation);
1076
1077 let return_type = self.extract_response_type(operation);
1078
1079 output.push_str(&format!(
1080 "@route(\"{}\", methods=[\"{}\"])\n",
1081 path,
1082 method.to_uppercase()
1083 ));
1084
1085 output.push_str(&format!("def {func_name}(request: Request"));
1086
1087 for (param_name, param_type) in &path_params {
1088 output.push_str(&format!(", {}: Path[{}]", param_name.to_snake_case(), param_type));
1089 }
1090
1091 for (param_name, param_type, required) in query_params.iter().filter(|(_, _, required)| *required) {
1092 output.push_str(&format!(", {}: Query[{}]", param_name.to_snake_case(), param_type));
1093 }
1094
1095 for (param_name, param_type, required) in query_params.iter().filter(|(_, _, required)| !*required) {
1096 let _ = required;
1097 output.push_str(&format!(
1098 ", {}: Query[{}] = Query(default=None)",
1099 param_name.to_snake_case(),
1100 param_type
1101 ));
1102 }
1103
1104 if let Some(body_type_name) = &body_type {
1105 output.push_str(&format!(", body: Body[{body_type_name}]"));
1106 }
1107
1108 output.push_str(&format!(") -> {return_type}:\n"));
1109
1110 let docstring =
1111 summarize_operation_doc(operation).unwrap_or_else(|| format!("Handle {} {}.", method.to_uppercase(), path));
1112 output.push_str(&format!(" \"\"\"{docstring}\"\"\"\n"));
1113
1114 output.push_str(" raise NotImplementedError(\"TODO: Implement this endpoint\")\n\n");
1115
1116 Ok(output)
1117 }
1118
1119 fn generate_main(&self) -> String {
1120 r#"
1121# Run the application
1122if __name__ == "__main__":
1123 from spikard.config import ServerConfig
1124
1125 app.run(config=ServerConfig(host="0.0.0.0", port=8000))
1126"#
1127 .to_string()
1128 }
1129}
1130
1131fn is_named_inline_object_schema(schema: &Schema) -> bool {
1132 matches!(&schema.schema_kind, SchemaKind::Type(Type::Object(obj)) if !obj.properties.is_empty())
1133}
1134
1135fn inline_array_item_schema(schema: &Schema) -> Option<&Schema> {
1136 match &schema.schema_kind {
1137 SchemaKind::Type(Type::Array(array_type)) => match &array_type.items {
1138 Some(ReferenceOr::Item(item_schema)) => Some(item_schema),
1139 _ => None,
1140 },
1141 _ => None,
1142 }
1143}
1144
1145fn summarize_operation_doc(operation: &Operation) -> Option<String> {
1146 operation
1147 .summary
1148 .as_deref()
1149 .or(operation.description.as_deref())
1150 .map(str::trim)
1151 .filter(|value| !value.is_empty())
1152 .map(|value| {
1153 let collapsed = value.split_whitespace().collect::<Vec<_>>().join(" ");
1154 if collapsed.ends_with(['.', '!', '?']) {
1155 collapsed
1156 } else {
1157 format!("{collapsed}.")
1158 }
1159 })
1160}
1161
1162fn ensure_sentence(text: &str) -> String {
1163 let trimmed = text.trim();
1164 if trimmed.ends_with(['.', '!', '?']) {
1165 trimmed.to_string()
1166 } else {
1167 format!("{trimmed}.")
1168 }
1169}
1170
1171fn operation_has_path_params(operation: &Operation) -> bool {
1172 operation
1173 .parameters
1174 .iter()
1175 .any(|param_ref| matches!(param_ref, ReferenceOr::Item(Parameter::Path { .. })))
1176}
1177
1178fn operation_has_query_params(operation: &Operation) -> bool {
1179 operation
1180 .parameters
1181 .iter()
1182 .any(|param_ref| matches!(param_ref, ReferenceOr::Item(Parameter::Query { .. })))
1183}
1184
1185fn operation_has_request_body(operation: &Operation) -> bool {
1186 operation.request_body.is_some()
1187}
1188
1189#[derive(Clone, Copy)]
1190enum PythonSpecialStringType {
1191 Date,
1192 DateTime,
1193 Uuid,
1194}