import os
import typing
import typing as t
import weakref
from collections import ChainMap
from functools import lru_cache
from functools import partial
from functools import reduce
from types import CodeType
from markupsafe import Markup
from . import nodes
from .compiler import CodeGenerator
from .compiler import generate
from .defaults import BLOCK_END_STRING
from .defaults import BLOCK_START_STRING
from .defaults import COMMENT_END_STRING
from .defaults import COMMENT_START_STRING
from .defaults import DEFAULT_FILTERS
from .defaults import DEFAULT_NAMESPACE
from .defaults import DEFAULT_POLICIES
from .defaults import DEFAULT_TESTS
from .defaults import KEEP_TRAILING_NEWLINE
from .defaults import LINE_COMMENT_PREFIX
from .defaults import LINE_STATEMENT_PREFIX
from .defaults import LSTRIP_BLOCKS
from .defaults import NEWLINE_SEQUENCE
from .defaults import TRIM_BLOCKS
from .defaults import VARIABLE_END_STRING
from .defaults import VARIABLE_START_STRING
from .exceptions import TemplateNotFound
from .exceptions import TemplateRuntimeError
from .exceptions import TemplatesNotFound
from .exceptions import TemplateSyntaxError
from .exceptions import UndefinedError
from .lexer import get_lexer
from .lexer import Lexer
from .lexer import TokenStream
from .nodes import EvalContext
from .parser import Parser
from .runtime import Context
from .runtime import new_context
from .runtime import Undefined
from .utils import _PassArg
from .utils import concat
from .utils import consume
from .utils import import_string
from .utils import internalcode
from .utils import LRUCache
from .utils import missing
if t.TYPE_CHECKING:
import typing_extensions as te
from .bccache import BytecodeCache
from .ext import Extension
from .loaders import BaseLoader
_env_bound = t.TypeVar("_env_bound", bound="Environment")
@lru_cache(maxsize=10)
def get_spontaneous_environment(cls: t.Type[_env_bound], *args: t.Any) -> _env_bound:
env = cls(*args)
env.shared = True
return env
def create_cache(
size: int,
) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]:
if size == 0:
return None
if size < 0:
return {}
return LRUCache(size)
def copy_cache(
cache: t.Optional[t.MutableMapping],
) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]:
if cache is None:
return None
if type(cache) is dict:
return {}
return LRUCache(cache.capacity)
def load_extensions(
environment: "Environment",
extensions: t.Sequence[t.Union[str, t.Type["Extension"]]],
) -> t.Dict[str, "Extension"]:
result = {}
for extension in extensions:
if isinstance(extension, str):
extension = t.cast(t.Type["Extension"], import_string(extension))
result[extension.identifier] = extension(environment)
return result
def _environment_config_check(environment: "Environment") -> "Environment":
assert issubclass(
environment.undefined, Undefined
), "'undefined' must be a subclass of 'jinja2.Undefined'."
assert (
environment.block_start_string
!= environment.variable_start_string
!= environment.comment_start_string
), "block, variable and comment start strings must be different."
assert environment.newline_sequence in {
"\r",
"\r\n",
"\n",
}, "'newline_sequence' must be one of '\\n', '\\r\\n', or '\\r'."
return environment
class Environment:
sandboxed = False
overlayed = False
linked_to: t.Optional["Environment"] = None
shared = False
code_generator_class: t.Type["CodeGenerator"] = CodeGenerator
concat = "".join
context_class: t.Type[Context] = Context
template_class: t.Type["Template"]
def __init__(
self,
block_start_string: str = BLOCK_START_STRING,
block_end_string: str = BLOCK_END_STRING,
variable_start_string: str = VARIABLE_START_STRING,
variable_end_string: str = VARIABLE_END_STRING,
comment_start_string: str = COMMENT_START_STRING,
comment_end_string: str = COMMENT_END_STRING,
line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX,
line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX,
trim_blocks: bool = TRIM_BLOCKS,
lstrip_blocks: bool = LSTRIP_BLOCKS,
newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE,
keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE,
extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (),
optimized: bool = True,
undefined: t.Type[Undefined] = Undefined,
finalize: t.Optional[t.Callable[..., t.Any]] = None,
autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False,
loader: t.Optional["BaseLoader"] = None,
cache_size: int = 400,
auto_reload: bool = True,
bytecode_cache: t.Optional["BytecodeCache"] = None,
enable_async: bool = False,
):
self.block_start_string = block_start_string
self.block_end_string = block_end_string
self.variable_start_string = variable_start_string
self.variable_end_string = variable_end_string
self.comment_start_string = comment_start_string
self.comment_end_string = comment_end_string
self.line_statement_prefix = line_statement_prefix
self.line_comment_prefix = line_comment_prefix
self.trim_blocks = trim_blocks
self.lstrip_blocks = lstrip_blocks
self.newline_sequence = newline_sequence
self.keep_trailing_newline = keep_trailing_newline
self.undefined: t.Type[Undefined] = undefined
self.optimized = optimized
self.finalize = finalize
self.autoescape = autoescape
self.filters = DEFAULT_FILTERS.copy()
self.tests = DEFAULT_TESTS.copy()
self.globals = DEFAULT_NAMESPACE.copy()
self.loader = loader
self.cache = create_cache(cache_size)
self.bytecode_cache = bytecode_cache
self.auto_reload = auto_reload
self.policies = DEFAULT_POLICIES.copy()
self.extensions = load_extensions(self, extensions)
self.is_async = enable_async
_environment_config_check(self)
def add_extension(self, extension: t.Union[str, t.Type["Extension"]]) -> None:
self.extensions.update(load_extensions(self, [extension]))
def extend(self, **attributes: t.Any) -> None:
for key, value in attributes.items():
if not hasattr(self, key):
setattr(self, key, value)
def overlay(
self,
block_start_string: str = missing,
block_end_string: str = missing,
variable_start_string: str = missing,
variable_end_string: str = missing,
comment_start_string: str = missing,
comment_end_string: str = missing,
line_statement_prefix: t.Optional[str] = missing,
line_comment_prefix: t.Optional[str] = missing,
trim_blocks: bool = missing,
lstrip_blocks: bool = missing,
newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = missing,
keep_trailing_newline: bool = missing,
extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = missing,
optimized: bool = missing,
undefined: t.Type[Undefined] = missing,
finalize: t.Optional[t.Callable[..., t.Any]] = missing,
autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = missing,
loader: t.Optional["BaseLoader"] = missing,
cache_size: int = missing,
auto_reload: bool = missing,
bytecode_cache: t.Optional["BytecodeCache"] = missing,
enable_async: bool = False,
) -> "Environment":
args = dict(locals())
del args["self"], args["cache_size"], args["extensions"], args["enable_async"]
rv = object.__new__(self.__class__)
rv.__dict__.update(self.__dict__)
rv.overlayed = True
rv.linked_to = self
for key, value in args.items():
if value is not missing:
setattr(rv, key, value)
if cache_size is not missing:
rv.cache = create_cache(cache_size)
else:
rv.cache = copy_cache(self.cache)
rv.extensions = {}
for key, value in self.extensions.items():
rv.extensions[key] = value.bind(rv)
if extensions is not missing:
rv.extensions.update(load_extensions(rv, extensions))
if enable_async is not missing:
rv.is_async = enable_async
return _environment_config_check(rv)
@property
def lexer(self) -> Lexer:
return get_lexer(self)
def iter_extensions(self) -> t.Iterator["Extension"]:
return iter(sorted(self.extensions.values(), key=lambda x: x.priority))
def getitem(
self, obj: t.Any, argument: t.Union[str, t.Any]
) -> t.Union[t.Any, Undefined]:
try:
return obj[argument]
except (AttributeError, TypeError, LookupError):
if isinstance(argument, str):
try:
attr = str(argument)
except Exception:
pass
else:
try:
return getattr(obj, attr)
except AttributeError:
pass
return self.undefined(obj=obj, name=argument)
def getattr(self, obj: t.Any, attribute: str) -> t.Any:
try:
return getattr(obj, attribute)
except AttributeError:
pass
try:
return obj[attribute]
except (TypeError, LookupError, AttributeError):
return self.undefined(obj=obj, name=attribute)
def _filter_test_common(
self,
name: t.Union[str, Undefined],
value: t.Any,
args: t.Optional[t.Sequence[t.Any]],
kwargs: t.Optional[t.Mapping[str, t.Any]],
context: t.Optional[Context],
eval_ctx: t.Optional[EvalContext],
is_filter: bool,
) -> t.Any:
if is_filter:
env_map = self.filters
type_name = "filter"
else:
env_map = self.tests
type_name = "test"
func = env_map.get(name)
if func is None:
msg = f"No {type_name} named {name!r}."
if isinstance(name, Undefined):
try:
name._fail_with_undefined_error()
except Exception as e:
msg = f"{msg} ({e}; did you forget to quote the callable name?)"
raise TemplateRuntimeError(msg)
args = [value, *(args if args is not None else ())]
kwargs = kwargs if kwargs is not None else {}
pass_arg = _PassArg.from_obj(func)
if pass_arg is _PassArg.context:
if context is None:
raise TemplateRuntimeError(
f"Attempted to invoke a context {type_name} without context."
)
args.insert(0, context)
elif pass_arg is _PassArg.eval_context:
if eval_ctx is None:
if context is not None:
eval_ctx = context.eval_ctx
else:
eval_ctx = EvalContext(self)
args.insert(0, eval_ctx)
elif pass_arg is _PassArg.environment:
args.insert(0, self)
return func(*args, **kwargs)
def call_filter(
self,
name: str,
value: t.Any,
args: t.Optional[t.Sequence[t.Any]] = None,
kwargs: t.Optional[t.Mapping[str, t.Any]] = None,
context: t.Optional[Context] = None,
eval_ctx: t.Optional[EvalContext] = None,
) -> t.Any:
return self._filter_test_common(
name, value, args, kwargs, context, eval_ctx, True
)
def call_test(
self,
name: str,
value: t.Any,
args: t.Optional[t.Sequence[t.Any]] = None,
kwargs: t.Optional[t.Mapping[str, t.Any]] = None,
context: t.Optional[Context] = None,
eval_ctx: t.Optional[EvalContext] = None,
) -> t.Any:
return self._filter_test_common(
name, value, args, kwargs, context, eval_ctx, False
)
@internalcode
def parse(
self,
source: str,
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
) -> nodes.Template:
try:
return self._parse(source, name, filename)
except TemplateSyntaxError:
self.handle_exception(source=source)
def _parse(
self, source: str, name: t.Optional[str], filename: t.Optional[str]
) -> nodes.Template:
return Parser(self, source, name, filename).parse()
def lex(
self,
source: str,
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
) -> t.Iterator[t.Tuple[int, str, str]]:
source = str(source)
try:
return self.lexer.tokeniter(source, name, filename)
except TemplateSyntaxError:
self.handle_exception(source=source)
def preprocess(
self,
source: str,
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
) -> str:
return reduce(
lambda s, e: e.preprocess(s, name, filename),
self.iter_extensions(),
str(source),
)
def _tokenize(
self,
source: str,
name: t.Optional[str],
filename: t.Optional[str] = None,
state: t.Optional[str] = None,
) -> TokenStream:
source = self.preprocess(source, name, filename)
stream = self.lexer.tokenize(source, name, filename, state)
for ext in self.iter_extensions():
stream = ext.filter_stream(stream)
if not isinstance(stream, TokenStream):
stream = TokenStream(stream, name, filename)
return stream
def _generate(
self,
source: nodes.Template,
name: t.Optional[str],
filename: t.Optional[str],
defer_init: bool = False,
) -> str:
return generate( source,
self,
name,
filename,
defer_init=defer_init,
optimized=self.optimized,
)
def _compile(self, source: str, filename: str) -> CodeType:
return compile(source, filename, "exec")
@typing.overload
def compile( self,
source: t.Union[str, nodes.Template],
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
raw: "te.Literal[False]" = False,
defer_init: bool = False,
) -> CodeType:
...
@typing.overload
def compile(
self,
source: t.Union[str, nodes.Template],
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
raw: "te.Literal[True]" = ...,
defer_init: bool = False,
) -> str:
...
@internalcode
def compile(
self,
source: t.Union[str, nodes.Template],
name: t.Optional[str] = None,
filename: t.Optional[str] = None,
raw: bool = False,
defer_init: bool = False,
) -> t.Union[str, CodeType]:
source_hint = None
try:
if isinstance(source, str):
source_hint = source
source = self._parse(source, name, filename)
source = self._generate(source, name, filename, defer_init=defer_init)
if raw:
return source
if filename is None:
filename = "<template>"
return self._compile(source, filename)
except TemplateSyntaxError:
self.handle_exception(source=source_hint)
def compile_expression(
self, source: str, undefined_to_none: bool = True
) -> "TemplateExpression":
parser = Parser(self, source, state="variable")
try:
expr = parser.parse_expression()
if not parser.stream.eos:
raise TemplateSyntaxError(
"chunk after expression", parser.stream.current.lineno, None, None
)
expr.set_environment(self)
except TemplateSyntaxError:
self.handle_exception(source=source)
body = [nodes.Assign(nodes.Name("result", "store"), expr, lineno=1)]
template = self.from_string(nodes.Template(body, lineno=1))
return TemplateExpression(template, undefined_to_none)
def compile_templates(
self,
target: t.Union[str, os.PathLike],
extensions: t.Optional[t.Collection[str]] = None,
filter_func: t.Optional[t.Callable[[str], bool]] = None,
zip: t.Optional[str] = "deflated",
log_function: t.Optional[t.Callable[[str], None]] = None,
ignore_errors: bool = True,
) -> None:
from .loaders import ModuleLoader
if log_function is None:
def log_function(x: str) -> None:
pass
assert log_function is not None
assert self.loader is not None, "No loader configured."
def write_file(filename: str, data: str) -> None:
if zip:
info = ZipInfo(filename)
info.external_attr = 0o755 << 16
info.create_system = 3
zip_file.writestr(info, data)
else:
with open(os.path.join(target, filename), "wb") as f:
f.write(data.encode("utf8"))
if zip is not None:
from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED, ZIP_STORED
zip_file = ZipFile(
target, "w", dict(deflated=ZIP_DEFLATED, stored=ZIP_STORED)[zip]
)
log_function(f"Compiling into Zip archive {target!r}")
else:
if not os.path.isdir(target):
os.makedirs(target)
log_function(f"Compiling into folder {target!r}")
try:
for name in self.list_templates(extensions, filter_func):
source, filename, _ = self.loader.get_source(self, name)
try:
code = self.compile(source, name, filename, True, True)
except TemplateSyntaxError as e:
if not ignore_errors:
raise
log_function(f'Could not compile "{name}": {e}')
continue
filename = ModuleLoader.get_module_filename(name)
write_file(filename, code)
log_function(f'Compiled "{name}" as {filename}')
finally:
if zip:
zip_file.close()
log_function("Finished compiling templates")
def list_templates(
self,
extensions: t.Optional[t.Collection[str]] = None,
filter_func: t.Optional[t.Callable[[str], bool]] = None,
) -> t.List[str]:
assert self.loader is not None, "No loader configured."
names = self.loader.list_templates()
if extensions is not None:
if filter_func is not None:
raise TypeError(
"either extensions or filter_func can be passed, but not both"
)
def filter_func(x: str) -> bool:
return "." in x and x.rsplit(".", 1)[1] in extensions
if filter_func is not None:
names = [name for name in names if filter_func(name)]
return names
def handle_exception(self, source: t.Optional[str] = None) -> "te.NoReturn":
from .debug import rewrite_traceback_stack
raise rewrite_traceback_stack(source=source)
def join_path(self, template: str, parent: str) -> str:
return template
@internalcode
def _load_template(
self, name: str, globals: t.Optional[t.MutableMapping[str, t.Any]]
) -> "Template":
if self.loader is None:
raise TypeError("no loader for this environment specified")
cache_key = (weakref.ref(self.loader), name)
if self.cache is not None:
template = self.cache.get(cache_key)
if template is not None and (
not self.auto_reload or template.is_up_to_date
):
if globals:
template.globals.update(globals)
return template
template = self.loader.load(self, name, self.make_globals(globals))
if self.cache is not None:
self.cache[cache_key] = template
return template
@internalcode
def get_template(
self,
name: t.Union[str, "Template"],
parent: t.Optional[str] = None,
globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
) -> "Template":
if isinstance(name, Template):
return name
if parent is not None:
name = self.join_path(name, parent)
return self._load_template(name, globals)
@internalcode
def select_template(
self,
names: t.Iterable[t.Union[str, "Template"]],
parent: t.Optional[str] = None,
globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
) -> "Template":
if isinstance(names, Undefined):
names._fail_with_undefined_error()
if not names:
raise TemplatesNotFound(
message="Tried to select from an empty list of templates."
)
for name in names:
if isinstance(name, Template):
return name
if parent is not None:
name = self.join_path(name, parent)
try:
return self._load_template(name, globals)
except (TemplateNotFound, UndefinedError):
pass
raise TemplatesNotFound(names)
@internalcode
def get_or_select_template(
self,
template_name_or_list: t.Union[
str, "Template", t.List[t.Union[str, "Template"]]
],
parent: t.Optional[str] = None,
globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
) -> "Template":
if isinstance(template_name_or_list, (str, Undefined)):
return self.get_template(template_name_or_list, parent, globals)
elif isinstance(template_name_or_list, Template):
return template_name_or_list
return self.select_template(template_name_or_list, parent, globals)
def from_string(
self,
source: t.Union[str, nodes.Template],
globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
template_class: t.Optional[t.Type["Template"]] = None,
) -> "Template":
gs = self.make_globals(globals)
cls = template_class or self.template_class
return cls.from_code(self, self.compile(source), gs, None)
def make_globals(
self, d: t.Optional[t.MutableMapping[str, t.Any]]
) -> t.MutableMapping[str, t.Any]:
if d is None:
d = {}
return ChainMap(d, self.globals)
class Template:
environment_class: t.Type[Environment] = Environment
environment: Environment
globals: t.MutableMapping[str, t.Any]
name: t.Optional[str]
filename: t.Optional[str]
blocks: t.Dict[str, t.Callable[[Context], t.Iterator[str]]]
root_render_func: t.Callable[[Context], t.Iterator[str]]
_module: t.Optional["TemplateModule"]
_debug_info: str
_uptodate: t.Optional[t.Callable[[], bool]]
def __new__(
cls,
source: t.Union[str, nodes.Template],
block_start_string: str = BLOCK_START_STRING,
block_end_string: str = BLOCK_END_STRING,
variable_start_string: str = VARIABLE_START_STRING,
variable_end_string: str = VARIABLE_END_STRING,
comment_start_string: str = COMMENT_START_STRING,
comment_end_string: str = COMMENT_END_STRING,
line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX,
line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX,
trim_blocks: bool = TRIM_BLOCKS,
lstrip_blocks: bool = LSTRIP_BLOCKS,
newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE,
keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE,
extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (),
optimized: bool = True,
undefined: t.Type[Undefined] = Undefined,
finalize: t.Optional[t.Callable[..., t.Any]] = None,
autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False,
enable_async: bool = False,
) -> t.Any: env = get_spontaneous_environment(
cls.environment_class, block_start_string,
block_end_string,
variable_start_string,
variable_end_string,
comment_start_string,
comment_end_string,
line_statement_prefix,
line_comment_prefix,
trim_blocks,
lstrip_blocks,
newline_sequence,
keep_trailing_newline,
frozenset(extensions),
optimized,
undefined, finalize,
autoescape,
None,
0,
False,
None,
enable_async,
)
return env.from_string(source, template_class=cls)
@classmethod
def from_code(
cls,
environment: Environment,
code: CodeType,
globals: t.MutableMapping[str, t.Any],
uptodate: t.Optional[t.Callable[[], bool]] = None,
) -> "Template":
namespace = {"environment": environment, "__file__": code.co_filename}
exec(code, namespace)
rv = cls._from_namespace(environment, namespace, globals)
rv._uptodate = uptodate
return rv
@classmethod
def from_module_dict(
cls,
environment: Environment,
module_dict: t.MutableMapping[str, t.Any],
globals: t.MutableMapping[str, t.Any],
) -> "Template":
return cls._from_namespace(environment, module_dict, globals)
@classmethod
def _from_namespace(
cls,
environment: Environment,
namespace: t.MutableMapping[str, t.Any],
globals: t.MutableMapping[str, t.Any],
) -> "Template":
t: "Template" = object.__new__(cls)
t.environment = environment
t.globals = globals
t.name = namespace["name"]
t.filename = namespace["__file__"]
t.blocks = namespace["blocks"]
t.root_render_func = namespace["root"] t._module = None
t._debug_info = namespace["debug_info"]
t._uptodate = None
namespace["environment"] = environment
namespace["__jinja_template__"] = t
return t
def render(self, *args: t.Any, **kwargs: t.Any) -> str:
if self.environment.is_async:
import asyncio
close = False
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
close = True
try:
return loop.run_until_complete(self.render_async(*args, **kwargs))
finally:
if close:
loop.close()
ctx = self.new_context(dict(*args, **kwargs))
try:
return self.environment.concat(self.root_render_func(ctx)) except Exception:
self.environment.handle_exception()
async def render_async(self, *args: t.Any, **kwargs: t.Any) -> str:
if not self.environment.is_async:
raise RuntimeError(
"The environment was not created with async mode enabled."
)
ctx = self.new_context(dict(*args, **kwargs))
try:
return self.environment.concat( [n async for n in self.root_render_func(ctx)] )
except Exception:
return self.environment.handle_exception()
def stream(self, *args: t.Any, **kwargs: t.Any) -> "TemplateStream":
return TemplateStream(self.generate(*args, **kwargs))
def generate(self, *args: t.Any, **kwargs: t.Any) -> t.Iterator[str]:
if self.environment.is_async:
import asyncio
async def to_list() -> t.List[str]:
return [x async for x in self.generate_async(*args, **kwargs)]
yield from asyncio.run(to_list())
return
ctx = self.new_context(dict(*args, **kwargs))
try:
yield from self.root_render_func(ctx) except Exception:
yield self.environment.handle_exception()
async def generate_async(
self, *args: t.Any, **kwargs: t.Any
) -> t.AsyncIterator[str]:
if not self.environment.is_async:
raise RuntimeError(
"The environment was not created with async mode enabled."
)
ctx = self.new_context(dict(*args, **kwargs))
try:
async for event in self.root_render_func(ctx): yield event
except Exception:
yield self.environment.handle_exception()
def new_context(
self,
vars: t.Optional[t.Dict[str, t.Any]] = None,
shared: bool = False,
locals: t.Optional[t.Mapping[str, t.Any]] = None,
) -> Context:
return new_context(
self.environment, self.name, self.blocks, vars, shared, self.globals, locals
)
def make_module(
self,
vars: t.Optional[t.Dict[str, t.Any]] = None,
shared: bool = False,
locals: t.Optional[t.Mapping[str, t.Any]] = None,
) -> "TemplateModule":
ctx = self.new_context(vars, shared, locals)
return TemplateModule(self, ctx)
async def make_module_async(
self,
vars: t.Optional[t.Dict[str, t.Any]] = None,
shared: bool = False,
locals: t.Optional[t.Mapping[str, t.Any]] = None,
) -> "TemplateModule":
ctx = self.new_context(vars, shared, locals)
return TemplateModule(
self, ctx, [x async for x in self.root_render_func(ctx)] )
@internalcode
def _get_default_module(self, ctx: t.Optional[Context] = None) -> "TemplateModule":
if self.environment.is_async:
raise RuntimeError("Module is not available in async mode.")
if ctx is not None:
keys = ctx.globals_keys - self.globals.keys()
if keys:
return self.make_module({k: ctx.parent[k] for k in keys})
if self._module is None:
self._module = self.make_module()
return self._module
async def _get_default_module_async(
self, ctx: t.Optional[Context] = None
) -> "TemplateModule":
if ctx is not None:
keys = ctx.globals_keys - self.globals.keys()
if keys:
return await self.make_module_async({k: ctx.parent[k] for k in keys})
if self._module is None:
self._module = await self.make_module_async()
return self._module
@property
def module(self) -> "TemplateModule":
return self._get_default_module()
def get_corresponding_lineno(self, lineno: int) -> int:
for template_line, code_line in reversed(self.debug_info):
if code_line <= lineno:
return template_line
return 1
@property
def is_up_to_date(self) -> bool:
if self._uptodate is None:
return True
return self._uptodate()
@property
def debug_info(self) -> t.List[t.Tuple[int, int]]:
if self._debug_info:
return [
tuple(map(int, x.split("="))) for x in self._debug_info.split("&")
]
return []
def __repr__(self) -> str:
if self.name is None:
name = f"memory:{id(self):x}"
else:
name = repr(self.name)
return f"<{type(self).__name__} {name}>"
class TemplateModule:
def __init__(
self,
template: Template,
context: Context,
body_stream: t.Optional[t.Iterable[str]] = None,
) -> None:
if body_stream is None:
if context.environment.is_async:
raise RuntimeError(
"Async mode requires a body stream to be passed to"
" a template module. Use the async methods of the"
" API you are using."
)
body_stream = list(template.root_render_func(context))
self._body_stream = body_stream
self.__dict__.update(context.get_exported())
self.__name__ = template.name
def __html__(self) -> Markup:
return Markup(concat(self._body_stream))
def __str__(self) -> str:
return concat(self._body_stream)
def __repr__(self) -> str:
if self.__name__ is None:
name = f"memory:{id(self):x}"
else:
name = repr(self.__name__)
return f"<{type(self).__name__} {name}>"
class TemplateExpression:
def __init__(self, template: Template, undefined_to_none: bool) -> None:
self._template = template
self._undefined_to_none = undefined_to_none
def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Optional[t.Any]:
context = self._template.new_context(dict(*args, **kwargs))
consume(self._template.root_render_func(context)) rv = context.vars["result"]
if self._undefined_to_none and isinstance(rv, Undefined):
rv = None
return rv
class TemplateStream:
def __init__(self, gen: t.Iterator[str]) -> None:
self._gen = gen
self.disable_buffering()
def dump(
self,
fp: t.Union[str, t.IO],
encoding: t.Optional[str] = None,
errors: t.Optional[str] = "strict",
) -> None:
close = False
if isinstance(fp, str):
if encoding is None:
encoding = "utf-8"
fp = open(fp, "wb")
close = True
try:
if encoding is not None:
iterable = (x.encode(encoding, errors) for x in self) else:
iterable = self
if hasattr(fp, "writelines"):
fp.writelines(iterable)
else:
for item in iterable:
fp.write(item)
finally:
if close:
fp.close()
def disable_buffering(self) -> None:
self._next = partial(next, self._gen)
self.buffered = False
def _buffered_generator(self, size: int) -> t.Iterator[str]:
buf: t.List[str] = []
c_size = 0
push = buf.append
while True:
try:
while c_size < size:
c = next(self._gen)
push(c)
if c:
c_size += 1
except StopIteration:
if not c_size:
return
yield concat(buf)
del buf[:]
c_size = 0
def enable_buffering(self, size: int = 5) -> None:
if size <= 1:
raise ValueError("buffer size too small")
self.buffered = True
self._next = partial(next, self._buffered_generator(size))
def __iter__(self) -> "TemplateStream":
return self
def __next__(self) -> str:
return self._next()
Environment.template_class = Template